/* Copyright (C) 2014 InfiniDB, Inc.
   Copyright (C) 2016-2019 MariaDB Corporation

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

// $Id: we_dmlcommandproc.cpp 3082 2011-09-26 22:00:38Z chao $

#include <unistd.h>
#include "bytestream.h"
using namespace messageqcpp;

#include "we_messages.h"
#include "we_message_handlers.h"
#include "../../dbcon/dmlpackage/dmlpkg.h"
#include "we_dmlcommandproc.h"
using namespace dmlpackage;
#include "dmlpackageprocessor.h"
using namespace dmlpackageprocessor;
#include "dataconvert.h"
using namespace dataconvert;
#include "calpontsystemcatalog.h"
#include "sessionmanager.h"
using namespace execplan;
#include "messagelog.h"
#include "stopwatch.h"
#include "idberrorinfo.h"
#include "errorids.h"
using namespace logging;

#include "brmtypes.h"
using namespace BRM;
#include "we_tablemetadata.h"
#include "we_dbrootextenttracker.h"
#include "we_bulkrollbackmgr.h"
#include "we_define.h"
#include "we_confirmhdfsdbfile.h"
#include "cacheutils.h"
#include "IDBDataFile.h"
#include "IDBPolicy.h"
#include "checks.h"
#include "columnwidth.h"

using namespace std;
using namespace utils;

namespace WriteEngine
{
// StopWatch timer;
WE_DMLCommandProc::WE_DMLCommandProc()
{
  fIsFirstBatchPm = true;
  filesPerColumnPartition = 8;
  extentsPerSegmentFile = 1;
  dbrootCnt = 1;
  extentRows = 0x800000;
  config::Config* cf = config::Config::makeConfig();
  string fpc = cf->getConfig("ExtentMap", "FilesPerColumnPartition");

  if (fpc.length() != 0)
    filesPerColumnPartition = cf->uFromText(fpc);

  // MCOL-4685: remove the option to set more than 2 extents per file (ExtentsPreSegmentFile).
  extentsPerSegmentFile = DEFAULT_EXTENTS_PER_SEGMENT_FILE;

  string dbct = cf->getConfig("SystemConfig", "DBRootCount");

  if (dbct.length() != 0)
    dbrootCnt = cf->uFromText(dbct);
}

WE_DMLCommandProc::WE_DMLCommandProc(const WE_DMLCommandProc& rhs)
{
  fIsFirstBatchPm = rhs.fIsFirstBatchPm;
  fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
}

WE_DMLCommandProc::~WE_DMLCommandProc()
{
  dbRootExtTrackerVec.clear();
}

void WE_DMLCommandProc::processAuxCol(const std::vector<utils::NullString>& origVals,
                                      WriteEngine::ColValueList& colValuesList,
                                      WriteEngine::DictStrList& dicStringList)
{
  WriteEngine::ColTupleList auxColTuples;
  WriteEngine::dictStr auxDicStrings;

  for (uint32_t j = 0; j < origVals.size(); j++)
  {
    WriteEngine::ColTuple auxColTuple;
    auxColTuple.data = (uint8_t)1;
    auxColTuples.push_back(auxColTuple);
    //@Bug 2515. Only pass string values to write engine
    utils::NullString ns;
    auxDicStrings.push_back(ns);
  }

  colValuesList.push_back(auxColTuples);
  //@Bug 2515. Only pass string values to write engine
  dicStringList.push_back(auxDicStrings);
}

uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std::string& err)
{
  uint8_t rc = 0;
  err.clear();
  InsertDMLPackage insertPkg;
  ByteStream::quadbyte tmp32;
  bs >> tmp32;
  BRM::TxnID txnid;
  txnid.valid = true;
  txnid.id = tmp32;
  bs >> tmp32;
  uint32_t dbroot = tmp32;

  // cout << "processSingleInsert received bytestream length " << bs.length() << endl;

  messageqcpp::ByteStream::byte packageType;
  bs >> packageType;
  insertPkg.read(bs);
  uint32_t sessionId = insertPkg.get_SessionID();
  // cout << " processSingleInsert for session " << sessionId << endl;
  DMLTable* tablePtr = insertPkg.get_Table();
  RowList rows = tablePtr->get_RowList();

  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryValueList dctnryValueList;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::DictStrList dicStringList;
  CalpontSystemCatalog::TableName tableName;
  CalpontSystemCatalog::TableColName tableColName;
  tableName.table = tableColName.table = tablePtr->get_TableName();
  tableName.schema = tableColName.schema = tablePtr->get_SchemaName();

  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  CalpontSystemCatalog::ROPair tableRoPair;
  CalpontSystemCatalog::OID tableAUXColOid;
  std::vector<string> colNames;
  bool isWarningSet = false;

  try
  {
    tableRoPair = systemCatalogPtr->tableRID(tableName);
    tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName);

    if (rows.size())
    {
      Row* rowPtr = rows.at(0);
      ColumnList columns = rowPtr->get_ColumnList();
      unsigned int numcols = rowPtr->get_NumberOfColumns();
      cscColTypeList.reserve(numcols + 1);
      // WIP
      // We presume that DictCols number is low
      colStructs.reserve(numcols + 1);
      ColumnList::const_iterator column_iterator = columns.begin();

      while (column_iterator != columns.end())
      {
        DMLColumn* columnPtr = *column_iterator;
        tableColName.column = columnPtr->get_Name();
        // TODO MCOL-641 replace with getColRidsOidsTypes()
        CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName);

        CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);

        CalpontSystemCatalog::ColType colType;
        colType = systemCatalogPtr->colType(oid);

        WriteEngine::ColStruct colStruct;
        colStruct.fColDbRoot = dbroot;
        WriteEngine::DctnryStruct dctnryStruct;
        dctnryStruct.fColDbRoot = dbroot;
        colStruct.dataOid = roPair.objnum;
        colStruct.tokenFlag = false;
        colStruct.fCompressionType = colType.compressionType;

        // Token
        if (isDictCol(colType))
        {
          // WIP Hardcoded value
          colStruct.colWidth = 8;
          colStruct.tokenFlag = true;
        }
        else
        {
          colStruct.colWidth = colType.colWidth;
        }

        colStruct.colDataType = colType.colDataType;

        dctnryStruct.fCharsetNumber = colType.charsetNumber;

        if (colStruct.tokenFlag)
        {
          dctnryStruct.dctnryOid = colType.ddn.dictOID;
          dctnryStruct.columnOid = colStruct.dataOid;
          dctnryStruct.fCompressionType = colType.compressionType;
          dctnryStruct.colWidth = colType.colWidth;
        }
        else
        {
          dctnryStruct.dctnryOid = 0;
          dctnryStruct.columnOid = colStruct.dataOid;
          dctnryStruct.fCompressionType = colType.compressionType;
          dctnryStruct.colWidth = colType.colWidth;
        }

        colStructs.push_back(colStruct);
        dctnryStructList.push_back(dctnryStruct);
        cscColTypeList.push_back(colType);

        ++column_iterator;
      }

      // MCOL-5021 Valid AUX column OID for a table is > 3000
      // Tables that were created before this feature was added will have
      // tableAUXColOid = 0
      if (tableAUXColOid > 3000)
      {
        CalpontSystemCatalog::ColType colType;
        colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
        colType.colWidth = execplan::AUX_COL_WIDTH;
        colType.colDataType = execplan::AUX_COL_DATATYPE;
        WriteEngine::ColStruct colStruct;
        colStruct.fColDbRoot = dbroot;
        WriteEngine::DctnryStruct dctnryStruct;
        dctnryStruct.fColDbRoot = dbroot;
        colStruct.dataOid = tableAUXColOid;
        colStruct.tokenFlag = false;
        colStruct.fCompressionType = colType.compressionType;
        colStruct.colWidth = colType.colWidth;
        colStruct.colDataType = colType.colDataType;
        dctnryStruct.dctnryOid = 0;
        dctnryStruct.columnOid = colStruct.dataOid;
        dctnryStruct.fCompressionType = colType.compressionType;
        dctnryStruct.colWidth = colType.colWidth;
        colStructs.push_back(colStruct);
        dctnryStructList.push_back(dctnryStruct);
        cscColTypeList.push_back(colType);
      }

      NullString tmpStr;

      for (unsigned int i = 0; i < numcols; i++)
      {
        WriteEngine::ColTupleList colTuples;
        WriteEngine::DctColTupleList dctColTuples;
        RowList::const_iterator row_iterator = rows.begin();

        while (row_iterator != rows.end())
        {
          Row* rowPtr = *row_iterator;
          const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i);

          tableColName.column = columnPtr->get_Name();
          // TODO MCOL-641 remove these calls
          CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);

          CalpontSystemCatalog::ColType colType;
          colType = systemCatalogPtr->colType(oid);

          boost::any datavalue;
          bool isNULL = false;
          bool pushWarning = false;
          std::vector<NullString> origVals;
          origVals = columnPtr->get_DataVector();
          WriteEngine::dictStr dicStrings;

          // token
          if (isDictCol(colType))
          {
            for (uint32_t i = 0; i < origVals.size(); i++)
            {
              tmpStr = origVals[i];

              isNULL = columnPtr->get_isnull();

              isNULL = isNULL ? true : tmpStr.isNull();

              if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
              {
                if (isNULL && colType.defaultValue.isNull())  // error out
                {
                  Message::Args args;
                  args.add(tableColName.column);
                  err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
                  rc = 1;
                  return rc;
                }
                else if (isNULL && !(colType.defaultValue.isNull()))
                {
                  tmpStr = colType.defaultValue;
                }
              }

              if (tmpStr.length() > (unsigned int)colType.colWidth)
              {
                tmpStr.assign(tmpStr.unsafeStringRef().substr(0, colType.colWidth));

                if (!pushWarning)
                {
                  pushWarning = true;
                  isWarningSet = true;

                  if ((rc != NO_ERROR) && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
                    rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;

                  colNames.push_back(tableColName.column);
                }
              }

              WriteEngine::ColTuple colTuple;
              colTuple.data = datavalue;

              colTuples.push_back(colTuple);
              //@Bug 2515. Only pass string values to write engine
              dicStrings.push_back(tmpStr);
            }

            colValuesList.push_back(colTuples);
            //@Bug 2515. Only pass string values to write engine
            dicStringList.push_back(dicStrings);
          }
          else
          {
            string x;
            NullString indata;

            for (uint32_t i = 0; i < origVals.size(); i++)
            {
              indata = origVals[i];

              isNULL = columnPtr->get_isnull() ? true : indata.isNull();

              // check if autoincrement column and value is 0 or null
              uint64_t nextVal = 1;

              if (colType.autoincrement)
              {
                try
                {
                  // WIP What if we combine this and previous loop and fail
                  // after get nextAIValue ?
                  nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
                  fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
                }
                catch (std::exception& ex)
                {
                  err = ex.what();
                  rc = 1;
                  return rc;
                }
              }

              if (colType.autoincrement && (isNULL || (indata.safeString("").compare("0") == 0)))
              {
                try
                {
                  bool reserved = fDbrm.getAIRange(oid, 1, &nextVal);

                  if (!reserved)
                  {
                    err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
                    rc = 1;
                    return rc;
                  }
                }
                catch (std::exception& ex)
                {
                  err = ex.what();
                  rc = 1;
                  return rc;
                }

                ostringstream oss;
                oss << nextVal;
                indata.assign(oss.str());
                isNULL = false;
              }

              if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
              {
                if (isNULL && colType.defaultValue.isNull())  // error out
                {
                  Message::Args args;
                  args.add(tableColName.column);
                  err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
                  rc = 1;
                  return rc;
                }
                else if (isNULL && !(colType.defaultValue.isNull()))
                {
                  indata = colType.defaultValue;
                  isNULL = false;
                }
              }

              try
              {
                datavalue = colType.convertColumnData(indata, pushWarning, insertPkg.get_TimeZone(),
                                                      false, false);
              }
              catch (exception&)
              {
                rc = 1;
                Message::Args args;
                args.add(string("'") + indata.safeString("<<null>>") + string("'"));
                err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
              }

              //@Bug 1806
              if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
              {
                return rc;
              }

              if (pushWarning)
              {
                if (!isWarningSet)
                  isWarningSet = true;

                if (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
                  rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;

                colNames.push_back(tableColName.column);
              }

              WriteEngine::ColTuple colTuple;
              colTuple.data = datavalue;

              colTuples.push_back(colTuple);
              //@Bug 2515. Only pass string values to write engine
              dicStrings.push_back(tmpStr);
            }

            colValuesList.push_back(colTuples);
            dicStringList.push_back(dicStrings);
          }

          // MCOL-5021
          if ((i == numcols - 1) && (tableAUXColOid > 3000))
          {
            processAuxCol(origVals, colValuesList, dicStringList);
          }

          ++row_iterator;
        }
      }
    }
  }
  catch (exception& ex)
  {
    rc = 1;
    err = ex.what();
    return rc;
  }

  // call the write engine to write the rows
  int error = NO_ERROR;
  fWEWrapper.setIsInsert(true);
  fWEWrapper.setBulkFlag(true);
  fWEWrapper.setTransId(txnid.id);
  // For hdfs use only
  uint32_t tblOid = tableRoPair.objnum;

  // WIP are we saving HDFS?
  if (idbdatafile::IDBPolicy::useHdfs())
  {
    std::vector<Column> columns;
    DctnryStructList dctnryList;
    CalpontSystemCatalog::ColType colType;
    std::vector<DBRootExtentInfo> colDBRootExtentInfo;
    Convertor convertor;
    dbRootExtTrackerVec.clear();
    fRBMetaWriter.reset(new RBMetaWriter("SingleInsert", NULL));
    CalpontSystemCatalog::RIDList ridList;

    try
    {
      ridList = systemCatalogPtr->columnRIDs(tableName, true);
      std::vector<OID> dctnryStoreOids(ridList.size());
      std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
      bool bFirstExtentOnThisPM = false;

      // First gather HWM BRM information for all columns
      std::vector<int> colWidths;

      for (unsigned i = 0; i < ridList.size(); i++)
      {
        rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
        // need handle error

        CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
        colWidths.push_back(convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth));
      }

      for (unsigned i = 0; i < ridList.size(); i++)
      {
        // Find DBRoot/segment file where we want to start adding rows
        colType = systemCatalogPtr->colType(ridList[i].objnum);
        boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker(
            new DBRootExtentTracker(ridList[i].objnum, colWidths, dbRootHWMInfoColVec, i, 0));
        dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
        DBRootExtentInfo dbRootExtent;
        std::string trkErrMsg;
        bool bEmptyPM;

        if (i == 0)
          rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM,
                                                        trkErrMsg);
        else
          pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);

        colDBRootExtentInfo.push_back(dbRootExtent);

        Column aColumn;
        aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
        aColumn.colDataType = colType.colDataType;
        aColumn.compressionType = colType.compressionType;
        aColumn.dataFile.oid = ridList[i].objnum;
        aColumn.dataFile.fPartition = dbRootExtent.fPartition;
        aColumn.dataFile.fSegment = dbRootExtent.fSegment;
        aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
        aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
        columns.push_back(aColumn);

        if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
        {
          DctnryStruct aDctnry;
          aDctnry.dctnryOid = colType.ddn.dictOID;
          aDctnry.fColPartition = dbRootExtent.fPartition;
          aDctnry.fColSegment = dbRootExtent.fSegment;
          aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
          dctnryList.push_back(aDctnry);
        }

        if (colType.ddn.dictOID > 0)
        {
          dctnryStoreOids[i] = colType.ddn.dictOID;
        }
        else
        {
          dctnryStoreOids[i] = 0;
        }
      }

      fRBMetaWriter->init(tblOid, tableName.table);
      fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);

      // cout << "Backing up hwm chunks" << endl;
      for (unsigned i = 0; i < dctnryList.size(); i++)  // back up chunks for compressed dictionary
      {
        // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
        fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot,
                                            dctnryList[i].fColPartition, dctnryList[i].fColSegment);
      }
    }
    catch (std::exception& ex)
    {
      err = ex.what();
      rc = 1;
      return rc;
    }
  }

  if (colValuesList[0].size() > 0)
  {
    if (NO_ERROR !=
        (error = fWEWrapper.insertColumnRec_Single(txnid.id, cscColTypeList, colStructs, colValuesList,
                                                   dctnryStructList, dicStringList, tableRoPair.objnum)))
    {
      if (error == ERR_BRM_DEAD_LOCK)
      {
        rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
        WErrorCodes ec;
        err = ec.errorString(error);
      }
      else if (error == ERR_BRM_VB_OVERFLOW)
      {
        rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
        err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
      }
      else
      {
        rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
        WErrorCodes ec;
        err = ec.errorString(error);
      }
    }
  }

  std::map<uint32_t, uint32_t> oids;
  std::vector<BRM::OID_t> oidsToFlush;

  for (unsigned i = 0; i < colStructs.size(); i++)
  {
    oids[colStructs[i].dataOid] = colStructs[i].dataOid;
    oidsToFlush.push_back(colStructs[i].dataOid);
  }

  for (unsigned i = 0; i < dctnryStructList.size(); i++)
  {
    oids[dctnryStructList[i].dctnryOid] = dctnryStructList[i].dctnryOid;
    oidsToFlush.push_back(dctnryStructList[i].dctnryOid);
  }

  fWEWrapper.setTransId(txnid.id);
  vector<LBID_t> lbidList;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    // XXX THIS IS WRONG
    // save the extent info to mark them invalid, after flush, the meta file will be gone.
    std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();

    try
    {
      auto mapIter = m_txnLBIDMap.find(txnid.id);

      if (mapIter != m_txnLBIDMap.end())
      {
        SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
        lbidList = spTxnLBIDRec->m_LBIDs;
      }
    }
    catch (...)
    {
    }
  }

  // flush files
  // @bug5333, up to here, rc may have an error code already, don't overwrite it.
  int flushChunksRc = fWEWrapper.flushChunks(0, oids);  // why not pass rc to flushChunks?

  if (flushChunksRc != NO_ERROR)
  {
    WErrorCodes ec;
    std::ostringstream ossErr;
    ossErr << "Error flushing chunks for table " << tableName << "; " << ec.errorString(flushChunksRc);

    // Append to errmsg in case we already have an error
    if (err.length() > 0)
      err += "; ";

    err += ossErr.str();

    if (error == NO_ERROR)
      error = flushChunksRc;

    if ((rc == NO_ERROR) || (rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
      rc = 1;  // return hardcoded 1 as the above
  }

  // Confirm HDFS DB file changes "only" if no error up to this point
  if (idbdatafile::IDBPolicy::useHdfs())
  {
    if (error == NO_ERROR)
    {
      std::string eMsg;
      ConfirmHdfsDbFile confirmHdfs;
      error = confirmHdfs.confirmDbFileListFromMetaFile(tblOid, eMsg);

      if (error != NO_ERROR)
      {
        ostringstream ossErr;
        ossErr << "Error confirming changes to table " << tableName << "; " << eMsg;
        err = ossErr.str();
        rc = 1;
      }
      else  // Perform extra cleanup that is necessary for HDFS
      {
        std::string eMsg;
        ConfirmHdfsDbFile confirmHdfs;
        int confirmRc2 = confirmHdfs.endDbFileListFromMetaFile(tblOid, true, eMsg);

        if (confirmRc2 != NO_ERROR)
        {
          // Might want to log this error, but don't think we need
          // to report as fatal, since all changes were confirmed.
        }

        // flush PrimProc FD cache
        TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tblOid);
        ColsExtsInfoMap colsExtsInfoMap = aTableMetaData->getColsExtsInfoMap();
        ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
        ColExtsInfo::iterator aIt;
        std::vector<BRM::FileInfo> files;
        BRM::FileInfo aFile;

        while (it != colsExtsInfoMap.end())
        {
          aIt = (it->second).begin();
          aFile.oid = it->first;

          while (aIt != (it->second).end())
          {
            aFile.partitionNum = aIt->partNum;
            aFile.dbRoot = aIt->dbRoot;
            aFile.segmentNum = aIt->segNum;
            aFile.compType = aIt->compType;
            files.push_back(aFile);
            aIt++;
          }

          it++;
        }

        if (files.size() > 0)
          cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());

        cacheutils::flushOIDsFromCache(oidsToFlush);
        fDbrm.invalidateUncommittedExtentLBIDs(0, false, &lbidList);

        try
        {
          BulkRollbackMgr::deleteMetaFile(tblOid);
        }
        catch (exception& ex)
        {
          err = ex.what();
          rc = 1;
        }
      }
    }  // (error == NO_ERROR) through call to flushChunks()

    if (error != NO_ERROR)  // rollback
    {
      string applName("SingleInsert");
      fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err);
      BulkRollbackMgr::deleteMetaFile(tblOid);
    }
  }  // extra hdfs steps

  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  TableMetaData::removeTableMetaData(tblOid);

  if ((rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) || isWarningSet)
  {
    if (rc == NO_ERROR)
      rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;

    Message::Args args;
    string cols = "'" + colNames[0] + "'";

    for (unsigned i = 1; i < colNames.size(); i++)
    {
      cols = cols + ", " + "'" + colNames[i] + "'";
    }

    args.add(cols);
    err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);

    // Strict mode enabled, so rollback on warning
    if (insertPkg.get_isWarnToError())
    {
      string applName("SingleInsert");
      fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err);
      BulkRollbackMgr::deleteMetaFile(tblOid);
    }
  }

  // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
  CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
  CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);

  return rc;
}

uint8_t WE_DMLCommandProc::commitVersion(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t tmp32;
  int txnID;

  bs >> tmp32;
  txnID = tmp32;
  // cout << "processing commit txnid = " << txnID << endl;
  rc = fWEWrapper.commit(txnID);

  if (rc != 0)
  {
    WErrorCodes ec;
    ostringstream oss;
    oss << "WE: Error commiting transaction; " << txnID << ec.errorString(rc) << endl;
    err = oss.str();
  }

  return rc;
}

uint8_t WE_DMLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  int txnID;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;

  // cout << "processing rollbackBlocks txnid = " << txnID << endl;
  try
  {
    rc = fWEWrapper.rollbackBlocks(txnID, sessionID);
  }
  catch (std::exception& ex)
  {
    rc = 1;
    err = ex.what();
  }

  return rc;
}

uint8_t WE_DMLCommandProc::rollbackVersion(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  int txnID;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  // cout << "processing rollbackVersion txnid = " << txnID << endl;
  rc = fWEWrapper.rollbackVersion(txnID, sessionID);

  if (rc != 0)
  {
    WErrorCodes ec;
    ostringstream oss;
    oss << "WE: Error rolling back transaction " << txnID << " for session " << sessionID << "; "
        << ec.errorString(rc) << endl;
    err = oss.str();
  }

  return rc;
}

uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::string& err,
                                              ByteStream::quadbyte& PMId)
{
  int rc = 0;

  InsertDMLPackage insertPkg;
  ByteStream::quadbyte tmp32;
  bs >> tmp32;
  bs >> PMId;
  insertPkg.read(bs);
  uint32_t sessionId = insertPkg.get_SessionID();
  DMLTable* tablePtr = insertPkg.get_Table();
  bool isAutocommitOn = insertPkg.get_isAutocommitOn();

  if (idbdatafile::IDBPolicy::useHdfs())
    isAutocommitOn = true;

  BRM::TxnID txnid;
  txnid.id = tmp32;
  txnid.valid = true;
  RowList rows = tablePtr->get_RowList();
  bool isInsertSelect = insertPkg.get_isInsertSelect();

  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryValueList dctnryValueList;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::DictStrList dicStringList;
  CalpontSystemCatalog::TableName tableName;
  CalpontSystemCatalog::TableColName tableColName;
  tableName.table = tableColName.table = tablePtr->get_TableName();
  tableName.schema = tableColName.schema = tablePtr->get_SchemaName();
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  CalpontSystemCatalog::ROPair roPair;
  CalpontSystemCatalog::OID tableAUXColOid;
  CalpontSystemCatalog::RIDList ridList;
  CalpontSystemCatalog::DictOIDList dictOids;

  try
  {
    ridList = systemCatalogPtr->columnRIDs(tableName, true);
    roPair = systemCatalogPtr->tableRID(tableName);
    tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName);
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  uint32_t sizeToAdd = (tableAUXColOid > 3000 ? 1 : 0);
  std::vector<OID> dctnryStoreOids(ridList.size() + sizeToAdd);
  std::vector<Column> columns;
  DctnryStructList dctnryList;
  std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size() + sizeToAdd);

  uint32_t tblOid = roPair.objnum;
  std::vector<DBRootExtentInfo> colDBRootExtentInfo;
  bool bFirstExtentOnThisPM = false;
  Convertor convertor;

  if (fIsFirstBatchPm)
  {
    dbRootExtTrackerVec.clear();

    if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
      fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));

    fWEWrapper.setIsInsert(true);
    fWEWrapper.setBulkFlag(true);
    fWEWrapper.setTransId(txnid.id);

    try
    {
      std::vector<CalpontSystemCatalog::ColType> colTypes;

      for (unsigned i = 0; i < ridList.size(); i++)
      {
        CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(ridList[i].objnum);
        colTypes.push_back(colType);

        if (colType.ddn.dictOID > 0)
        {
          dctnryStoreOids[i] = colType.ddn.dictOID;
        }
        else
        {
          dctnryStoreOids[i] = 0;
        }
      }

      // First gather HWM BRM information for all columns
      std::vector<int> colWidths;
      for (unsigned i = 0; i < ridList.size(); i++)
      {
        rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
        // need handle error
        colWidths.push_back(convertor.getCorrectRowWidth(colTypes[i].colDataType, colTypes[i].colWidth));
      }

      // MCOL-5021
      if (tableAUXColOid > 3000)
      {
        rc = BRMWrapper::getInstance()->getDbRootHWMInfo(tableAUXColOid, dbRootHWMInfoColVec[ridList.size()]);
        colWidths.push_back(execplan::AUX_COL_WIDTH);
        dctnryStoreOids[ridList.size()] = 0;
        CalpontSystemCatalog::ROPair auxRoPair;
        auxRoPair.rid = ridList.back().rid + 1;
        auxRoPair.objnum = tableAUXColOid;
        ridList.push_back(auxRoPair);
      }

      for (unsigned i = 0; i < ridList.size(); i++)
      {
        uint32_t objNum = ridList[i].objnum;

        // Find DBRoot/segment file where we want to start adding rows
        boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker(
            new DBRootExtentTracker(objNum, colWidths, dbRootHWMInfoColVec, i, 0));
        dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
        DBRootExtentInfo dbRootExtent;
        std::string trkErrMsg;
        bool bEmptyPM;

        if (i == 0)
        {
          rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM,
                                                        trkErrMsg);
        }
        else
          pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);

        colDBRootExtentInfo.push_back(dbRootExtent);

        Column aColumn;

        if ((i == ridList.size() - 1) && (tableAUXColOid > 3000)) // AUX column
        {
          aColumn.colWidth = execplan::AUX_COL_WIDTH;
          aColumn.colDataType = execplan::AUX_COL_DATATYPE;
          // TODO MCOL-5021 compressionType for the AUX column is hard-coded to 2
          aColumn.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
        }
        else
        {
          const CalpontSystemCatalog::ColType& colType = colTypes[i];
          aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
          aColumn.colDataType = colType.colDataType;
          aColumn.compressionType = colType.compressionType;

          if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
          {
            DctnryStruct aDctnry;
            aDctnry.dctnryOid = colType.ddn.dictOID;
            aDctnry.fColPartition = dbRootExtent.fPartition;
            aDctnry.fColSegment = dbRootExtent.fSegment;
            aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
            dctnryList.push_back(aDctnry);
          }
        }

        aColumn.dataFile.oid = objNum;
        aColumn.dataFile.fPartition = dbRootExtent.fPartition;
        aColumn.dataFile.fSegment = dbRootExtent.fSegment;
        aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
        aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
        columns.push_back(aColumn);
      }
    }
    catch (std::exception& ex)
    {
      err = ex.what();
      rc = 1;
      return rc;
    }

    //@Bug 5996 validate hwm before starts
    rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting",
                            tableAUXColOid > 3000);

    if (rc != 0)
    {
      WErrorCodes ec;
      err = ec.errorString(rc);
      err += " Check err.log for detailed information.";
      fIsFirstBatchPm = false;
      rc = 1;
      return rc;
    }
  }

  std::vector<BRM::LBIDRange> rangeList;

  // use of MetaFile for bulk rollback support
  if (fIsFirstBatchPm && isAutocommitOn)
  {
    // save meta data, version last block for each dbroot at the start of batch insert
    try
    {
      fRBMetaWriter->init(tblOid, tableName.table);
      fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);

      if (!bFirstExtentOnThisPM)
      {
        for (unsigned i = 0; i < dctnryList.size(); i++)  // back up chunks for compressed dictionary
        {
          // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
          fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot,
                                              dctnryList[i].fColPartition, dctnryList[i].fColSegment);
        }
      }
    }
    catch (WeException& ex)  // catch exception to close file, then rethrow
    {
      rc = 1;
      err = ex.what();
    }

    // Do versioning. Currently, we only version columns, not strings. If there is a design change, this will
    // need to be re-visited
    if (rc != 0)
      return rc;
  }

  std::vector<string> colNames;
  bool isWarningSet = false;

  if (rows.size())
  {
    Row* rowPtr = rows.at(0);
    ColumnList columns = rowPtr->get_ColumnList();
    ColumnList::const_iterator column_iterator = columns.begin();

    try
    {
      while (column_iterator != columns.end())
      {
        DMLColumn* columnPtr = *column_iterator;
        tableColName.column = columnPtr->get_Name();
        CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName);

        CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);

        CalpontSystemCatalog::ColType colType;
        colType = systemCatalogPtr->colType(oid);

        WriteEngine::ColStruct colStruct;
        WriteEngine::DctnryStruct dctnryStruct;
        colStruct.dataOid = roPair.objnum;
        colStruct.tokenFlag = false;
        colStruct.fCompressionType = colType.compressionType;

        // Token
        if (isDictCol(colType))
        {
          colStruct.colWidth = 8;
          colStruct.tokenFlag = true;
        }
        else
        {
          colStruct.colWidth = colType.colWidth;
        }

        colStruct.colDataType = colType.colDataType;

        dctnryStruct.fCharsetNumber = colType.charsetNumber;

        if (colStruct.tokenFlag)
        {
          dctnryStruct.dctnryOid = colType.ddn.dictOID;
          dctnryStruct.columnOid = colStruct.dataOid;
          dctnryStruct.fCompressionType = colType.compressionType;
          dctnryStruct.colWidth = colType.colWidth;
        }
        else
        {
          dctnryStruct.dctnryOid = 0;
          dctnryStruct.columnOid = colStruct.dataOid;
          dctnryStruct.fCompressionType = colType.compressionType;
          dctnryStruct.colWidth = colType.colWidth;
        }

        colStructs.push_back(colStruct);
        dctnryStructList.push_back(dctnryStruct);
        cscColTypeList.push_back(colType);

        ++column_iterator;
      }

      // MCOL-5021 Valid AUX column OID for a table is > 3000
      // Tables that were created before this feature was added will have
      // tableAUXColOid = 0
      if (tableAUXColOid > 3000)
      {
        CalpontSystemCatalog::ColType colType;
        colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
        colType.colWidth = execplan::AUX_COL_WIDTH;
        colType.colDataType = execplan::AUX_COL_DATATYPE;
        WriteEngine::ColStruct colStruct;
        WriteEngine::DctnryStruct dctnryStruct;
        colStruct.dataOid = tableAUXColOid;
        colStruct.tokenFlag = false;
        colStruct.fCompressionType = colType.compressionType;
        colStruct.colWidth = colType.colWidth;
        colStruct.colDataType = colType.colDataType;
        dctnryStruct.dctnryOid = 0;
        dctnryStruct.columnOid = colStruct.dataOid;
        dctnryStruct.fCompressionType = colType.compressionType;
        dctnryStruct.colWidth = colType.colWidth;
        colStructs.push_back(colStruct);
        dctnryStructList.push_back(dctnryStruct);
        cscColTypeList.push_back(colType);
      }
    }
    catch (std::exception& ex)
    {
      err = ex.what();
      rc = 1;
      return rc;
    }

    unsigned int numcols = rowPtr->get_NumberOfColumns();
    NullString tmpStr;

    try
    {
      for (unsigned int i = 0; i < numcols; i++)
      {
        WriteEngine::ColTupleList colTuples;
        WriteEngine::DctColTupleList dctColTuples;
        RowList::const_iterator row_iterator = rows.begin();
        bool pushWarning = false;

        while (row_iterator != rows.end())
        {
          Row* rowPtr = *row_iterator;
          const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i);

          tableColName.column = columnPtr->get_Name();
          CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);

          CalpontSystemCatalog::ColType colType;
          colType = systemCatalogPtr->colType(oid);

          boost::any datavalue;
          bool isNULL = false;
          std::vector<NullString> origVals;
          origVals = columnPtr->get_DataVector();
          WriteEngine::dictStr dicStrings;

          // token
          if (isDictCol(colType))
          {
            for (uint32_t i = 0; i < origVals.size(); i++)
            {
              tmpStr = origVals[i];

              isNULL = tmpStr.isNull();

              if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
              {
                if (isNULL && colType.defaultValue.isNull())  // error out
                {
                  Message::Args args;
                  args.add(tableColName.column);
                  err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
                  rc = 1;
                  return rc;
                }
                else if (isNULL && !(colType.defaultValue.isNull()))
                {
                  tmpStr = colType.defaultValue;
                }
              }

              if (tmpStr.length() > (unsigned int)colType.colWidth)
              {
                tmpStr.assign(tmpStr.unsafeStringRef().substr(0, colType.colWidth));

                if (!pushWarning)
                  pushWarning = true;
              }

              WriteEngine::ColTuple colTuple;
              colTuple.data = datavalue;

              colTuples.push_back(colTuple);
              //@Bug 2515. Only pass string values to write engine
              dicStrings.push_back(tmpStr);
            }

            colValuesList.push_back(colTuples);
            //@Bug 2515. Only pass string values to write engine
            dicStringList.push_back(dicStrings);
          }
          else
          {
            string x;
            NullString indata;
            // scan once to check how many autoincrement value needed
            uint32_t nextValNeeded = 0;
            uint64_t nextVal = 1;

            if (colType.autoincrement)
            {
              try
              {
                nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
                fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
              }
              catch (std::exception& ex)
              {
                err = ex.what();
                rc = 1;
                return rc;
              }

              for (uint32_t i = 0; i < origVals.size(); i++)
              {
                indata = origVals[i];

                if (indata.isNull())
                  isNULL = true;
                else
                  isNULL = false;

                if (isNULL || (indata.safeString("").compare("0") == 0))
                  nextValNeeded++;
              }
            }

            if (nextValNeeded > 0)  // reserve next value
            {
              try
              {
                bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);

                if (!reserved)
                {
                  err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
                  rc = 1;
                  return rc;
                }
              }
              catch (std::exception& ex)
              {
                err = ex.what();
                rc = 1;
                return rc;
              }
            }

            for (uint32_t i = 0; i < origVals.size(); i++)
            {
              indata = origVals[i];

              if (indata.isNull())
                isNULL = true;
              else
                isNULL = false;

              // check if autoincrement column and value is 0 or null
              if (colType.autoincrement && (isNULL || (indata.safeString("").compare("0") == 0)))
              {
                ostringstream oss;
                oss << nextVal++;
                indata.assign(oss.str());
                isNULL = false;
              }

              if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
              {
                if (isNULL && colType.defaultValue.isNull())  // error out
                {
                  Message::Args args;
                  args.add(tableColName.column);
                  err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
                  rc = 1;
                  return rc;
                }
                else if (isNULL && !(colType.defaultValue.isNull()))
                {
                  indata = colType.defaultValue;
                  isNULL = false;
                }
              }

              try
              {
                datavalue = colType.convertColumnData(indata, pushWarning, insertPkg.get_TimeZone(),
                                                      false, false);
              }
              catch (exception&)
              {
                rc = 1;
                Message::Args args;
                args.add(string("'") + indata.safeString("<<null>>") + string("'"));
                err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
              }

              //@Bug 1806
              if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
              {
                return rc;
              }

              if (pushWarning && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
                rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;

              WriteEngine::ColTuple colTuple;
              colTuple.data = datavalue;

              colTuples.push_back(colTuple);
              //@Bug 2515. Only pass string values to write engine
              dicStrings.push_back(tmpStr);
            }

            colValuesList.push_back(colTuples);
            dicStringList.push_back(dicStrings);
          }

          // MCOL-5021
          if ((i == numcols - 1) && (tableAUXColOid > 3000))
          {
            processAuxCol(origVals, colValuesList, dicStringList);
          }

          ++row_iterator;
        }

        if (pushWarning)
        {
          colNames.push_back(tableColName.column);
          isWarningSet = true;
        }
      }
    }
    catch (std::exception& ex)
    {
      err = ex.what();
      rc = 1;
      return rc;
    }
  }

  // call the write engine to write the rows
  int error = NO_ERROR;

  if (colValuesList.size() > 0)
  {
    if (colValuesList[0].size() > 0)
    {
      /* Begin-Disable use of MetaFile for bulk rollback support;
         Use alternate call below that passes 0 ptr for RBMetaWriter
                          if (NO_ERROR !=
                          (error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList,
      dctnryStructList, dicStringList, dbRootExtTrackerVec, fRBMetaWriter.get(), bFirstExtentOnThisPM,
      isInsertSelect, 0, roPair.objnum, fIsFirstBatchPm))) End-Disable use of MetaFile for bulk rollback
      support
      */

      if (NO_ERROR != (error = fWEWrapper.insertColumnRecs(
                           txnid.id, cscColTypeList, colStructs, colValuesList, dctnryStructList,
                           dicStringList, dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect,
                           isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
      {
        if (error == ERR_BRM_DEAD_LOCK)
        {
          rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
          WErrorCodes ec;
          err = ec.errorString(error);
        }
        else if (error == ERR_BRM_VB_OVERFLOW)
        {
          rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
          err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
        }
        else
        {
          rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
          WErrorCodes ec;
          err = ec.errorString(error);
        }
      }
    }
  }

  if (fIsFirstBatchPm && isAutocommitOn)
  {
    // fWEWrapper.writeVBEnd(txnid.id, rangeList);
    fIsFirstBatchPm = false;
  }
  else if (fIsFirstBatchPm)
  {
    fIsFirstBatchPm = false;
  }

  if (isWarningSet && (rc == NO_ERROR))
  {
    rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
    Message::Args args;
    string cols = "'" + colNames[0] + "'";

    for (unsigned i = 1; i < colNames.size(); i++)
    {
      cols = cols + ", " + "'" + colNames[i] + "'";
    }

    args.add(cols);
    err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);

    // Strict mode enabled, so rollback on warning
    if (insertPkg.get_isWarnToError())
    {
      string applName("BatchInsert");
      fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err);
      BulkRollbackMgr::deleteMetaFile(tblOid);
    }
  }

  // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
  CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
  CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
  return rc;
}

uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err,
                                                    ByteStream::quadbyte& PMId)
{
  int rc = 0;
  // cout << "processBatchInsert received bytestream length " << bs.length() << endl;

  ByteStream::quadbyte tmp32;
  ByteStream::byte tmp8;
  bs >> tmp32;
  // cout << "processBatchInsert got transaction id " << tmp32 << endl;
  bs >> PMId;
  // cout << "processBatchInsert gor PMId " << PMId << endl;
  uint32_t sessionId;
  bs >> sessionId;
  // cout << " processBatchInsert for session " << sessionId << endl;
  bool isAutocommitOn;
  bs >> tmp8;
  isAutocommitOn = tmp8;

  if (idbdatafile::IDBPolicy::useHdfs())
    isAutocommitOn = true;

  // cout << "This session isAutocommitOn is " << isAutocommitOn << endl;
  BRM::TxnID txnid;
  txnid.id = tmp32;
  txnid.valid = true;
  bool isInsertSelect;
  bs >> tmp8;
  // For insert select, skip the hwm block and start inserting from the next block
  // to avoid self insert issue.
  // For batch insert: if not first batch, use the saved last rid to start adding rows.
  isInsertSelect = tmp8;

  WriteEngine::ColStructList colStructs;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryValueList dctnryValueList;
  std::vector<uint64_t> colValuesList;
  WriteEngine::DictStrList dicStringList;
  CalpontSystemCatalog::TableName tableName;
  CalpontSystemCatalog::TableColName tableColName;
  bs >> tableColName.table;
  bs >> tableColName.schema;
  tableName.table = tableColName.table;
  tableName.schema = tableColName.schema;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  CalpontSystemCatalog::ROPair roPair;
  CalpontSystemCatalog::RIDList ridList;
  CalpontSystemCatalog::DictOIDList dictOids;

  try
  {
    ridList = systemCatalogPtr->columnRIDs(tableName, true);
    roPair = systemCatalogPtr->tableRID(tableName);
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  std::vector<OID> dctnryStoreOids(ridList.size());
  std::vector<Column> columns;
  DctnryStructList dctnryList;
  std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());

  uint32_t tblOid = roPair.objnum;
  CalpontSystemCatalog::ColType colType;
  std::vector<DBRootExtentInfo> colDBRootExtentInfo;
  bool bFirstExtentOnThisPM = false;
  Convertor convertor;

  if (fIsFirstBatchPm)
  {
    dbRootExtTrackerVec.clear();

    if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
      fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));

    fWEWrapper.setIsInsert(true);
    fWEWrapper.setBulkFlag(true);
    fWEWrapper.setTransId(txnid.id);

    try
    {
      // First gather HWM BRM information for all columns
      std::vector<int> colWidths;

      for (unsigned i = 0; i < ridList.size(); i++)
      {
        rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
        // need handle error

        CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
        colWidths.push_back(convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth));
      }

      for (unsigned i = 0; i < ridList.size(); i++)
      {
        // Find DBRoot/segment file where we want to start adding rows
        colType = systemCatalogPtr->colType(ridList[i].objnum);
        boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker(
            new DBRootExtentTracker(ridList[i].objnum, colWidths, dbRootHWMInfoColVec, i, 0));
        dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
        DBRootExtentInfo dbRootExtent;
        std::string trkErrMsg;
        bool bEmptyPM;

        if (i == 0)
        {
          rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM,
                                                        trkErrMsg);
          /*	cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " <<
             (int)bFirstExtentOnThisPM << " oid:dbroot:hwm = " << ridList[i].objnum <<
             ":"<<dbRootExtent.fDbRoot << ":"
              <<":"<<dbRootExtent.fLocalHwm << " err = " << trkErrMsg << endl; */
        }
        else
          pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);

        colDBRootExtentInfo.push_back(dbRootExtent);

        Column aColumn;
        aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
        aColumn.colDataType = colType.colDataType;
        aColumn.compressionType = colType.compressionType;
        aColumn.dataFile.oid = ridList[i].objnum;
        aColumn.dataFile.fPartition = dbRootExtent.fPartition;
        aColumn.dataFile.fSegment = dbRootExtent.fSegment;
        aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
        aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
        columns.push_back(aColumn);

        if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
        {
          DctnryStruct aDctnry;
          aDctnry.dctnryOid = colType.ddn.dictOID;
          aDctnry.fColPartition = dbRootExtent.fPartition;
          aDctnry.fColSegment = dbRootExtent.fSegment;
          aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
          dctnryList.push_back(aDctnry);
        }

        if (colType.ddn.dictOID > 0)
        {
          dctnryStoreOids[i] = colType.ddn.dictOID;
        }
        else
        {
          dctnryStoreOids[i] = 0;
        }
      }
    }
    catch (std::exception& ex)
    {
      err = ex.what();
      rc = 1;
      return rc;
    }

    // @Bug 5996 validate hwm before starts
    // TODO MCOL-5021 hasAuxCol is hardcoded to false; add support here.
    rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting", false);

    if (rc != 0)
    {
      WErrorCodes ec;
      err = ec.errorString(rc);
      err += " Check err.log for detailed information.";
      fIsFirstBatchPm = false;
      rc = 1;
      return rc;
    }
  }

  std::vector<BRM::LBIDRange> rangeList;

  // use of MetaFile for bulk rollback support
  if (fIsFirstBatchPm && isAutocommitOn)
  {
    // save meta data, version last block for each dbroot at the start of batch insert
    try
    {
      fRBMetaWriter->init(tblOid, tableName.table);
      fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);

      // cout << "Saved meta files" << endl;
      if (!bFirstExtentOnThisPM)
      {
        // cout << "Backing up hwm chunks" << endl;
        for (unsigned i = 0; i < dctnryList.size(); i++)  // back up chunks for compressed dictionary
        {
          // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
          fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot,
                                              dctnryList[i].fColPartition, dctnryList[i].fColSegment);
        }
      }
    }
    catch (WeException& ex)  // catch exception to close file, then rethrow
    {
      rc = 1;
      err = ex.what();
    }

    // Do versioning. Currently, we only version columns, not strings. If there is a design change, this will
    // need to be re-visited
    if (rc != 0)
      return rc;
  }

  std::vector<string> colNames;
  bool isWarningSet = false;
  uint32_t columnCount;
  bs >> columnCount;

  if (columnCount)
  {
    try
    {
      for (uint32_t current_column = 0; current_column < columnCount; current_column++)
      {
        uint32_t tmp32;
        std::string colName;
        bs >> tmp32;
        bs >> colName;
        colNames.push_back(colName);
        CalpontSystemCatalog::OID oid = tmp32;

        CalpontSystemCatalog::ColType colType;
        colType = systemCatalogPtr->colType(oid);

        WriteEngine::ColStruct colStruct;
        WriteEngine::DctnryStruct dctnryStruct;
        colStruct.dataOid = oid;
        colStruct.tokenFlag = false;
        colStruct.fCompressionType = colType.compressionType;

        // Token
        if (isDictCol(colType))
        {
          colStruct.colWidth = 8;
          colStruct.tokenFlag = true;
        }
        else
        {
          colStruct.colWidth = colType.colWidth;
        }

        colStruct.colDataType = colType.colDataType;

        dctnryStruct.fCharsetNumber = colType.charsetNumber;

        if (colStruct.tokenFlag)
        {
          dctnryStruct.dctnryOid = colType.ddn.dictOID;
          dctnryStruct.columnOid = colStruct.dataOid;
          dctnryStruct.fCompressionType = colType.compressionType;
          dctnryStruct.colWidth = colType.colWidth;
        }
        else
        {
          dctnryStruct.dctnryOid = 0;
          dctnryStruct.columnOid = colStruct.dataOid;
          dctnryStruct.fCompressionType = colType.compressionType;
          dctnryStruct.colWidth = colType.colWidth;
        }

        colStructs.push_back(colStruct);
        dctnryStructList.push_back(dctnryStruct);
      }
    }
    catch (std::exception& ex)
    {
      err = ex.what();
      rc = 1;
      return rc;
    }

    NullString tmpStr;
    uint32_t valuesPerColumn;
    bs >> valuesPerColumn;
    colValuesList.reserve(columnCount * valuesPerColumn);

    try
    {
      bool pushWarning = false;

      for (uint32_t j = 0; j < columnCount; j++)
      {
        WriteEngine::DctColTupleList dctColTuples;
        tableColName.column = colNames[j];
        CalpontSystemCatalog::OID oid = colStructs[j].dataOid;

        CalpontSystemCatalog::ColType colType;
        colType = systemCatalogPtr->colType(oid);

        bool isNULL = false;
        WriteEngine::dictStr dicStrings;

        // token
        if (isDictCol(colType))
        {
          for (uint32_t i = 0; i < valuesPerColumn; i++)
          {
            bs >> tmpStr;

            isNULL = tmpStr.isNull();

            if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
            {
              if (isNULL && colType.defaultValue.isNull())  // error out
              {
                Message::Args args;
                args.add(tableColName.column);
                err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
                rc = 1;
                return rc;
              }
              else if (isNULL && !(colType.defaultValue.isNull()))
              {
                tmpStr = colType.defaultValue;
              }
            }

            if (tmpStr.length() > (unsigned int)colType.colWidth)
            {
              tmpStr.assign(tmpStr.unsafeStringRef().substr(0, colType.colWidth));

              if (!pushWarning)
                pushWarning = true;
            }

            colValuesList.push_back(0);
            //@Bug 2515. Only pass string values to write engine
            dicStrings.push_back(tmpStr);
          }

          //@Bug 2515. Only pass string values to write engine
          dicStringList.push_back(dicStrings);
        }
        else
        {
          string x;
          // scan once to check how many autoincrement value needed
          uint32_t nextValNeeded = 0;
          uint64_t nextVal = 1;

          if (colType.autoincrement)
          {
            try
            {
              nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
              fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
            }
            catch (std::exception& ex)
            {
              err = ex.what();
              rc = 1;
              return rc;
            }
          }

          for (uint32_t i = 0; i < valuesPerColumn; i++)
          {
            bs >> tmp8;
            isNULL = tmp8;

            uint8_t val8;
            uint16_t val16;
            uint32_t val32;
            uint64_t val64;
            uint64_t colValue;
            float valF;
            double valD;
            NullString valStr;
            bool valZero = false;  // Needed for autoinc check

            switch (colType.colDataType)
            {
              case execplan::CalpontSystemCatalog::TINYINT:
              case execplan::CalpontSystemCatalog::UTINYINT:
                bs >> val8;

                if (val8 == 0)
                  valZero = true;

                colValue = val8;
                break;

              case execplan::CalpontSystemCatalog::SMALLINT:
              case execplan::CalpontSystemCatalog::USMALLINT:
                bs >> val16;

                if (val16 == 0)
                  valZero = true;

                colValue = val16;
                break;

              case execplan::CalpontSystemCatalog::DATE:
              case execplan::CalpontSystemCatalog::MEDINT:
              case execplan::CalpontSystemCatalog::INT:
              case execplan::CalpontSystemCatalog::UMEDINT:
              case execplan::CalpontSystemCatalog::UINT:
                bs >> val32;

                if (val32 == 0)
                  valZero = true;

                colValue = val32;
                break;

              case execplan::CalpontSystemCatalog::BIGINT:
              case execplan::CalpontSystemCatalog::DATETIME:
              case execplan::CalpontSystemCatalog::TIME:
              case execplan::CalpontSystemCatalog::TIMESTAMP:
              case execplan::CalpontSystemCatalog::UBIGINT:
                bs >> val64;

                if (val64 == 0)
                  valZero = true;

                colValue = val64;
                break;

              case execplan::CalpontSystemCatalog::DECIMAL:
                switch (colType.colWidth)
                {
                  case 1:
                  {
                    bs >> val8;
                    colValue = val8;
                    break;
                  }

                  case 2:
                  {
                    bs >> val16;
                    colValue = val16;
                    break;
                  }

                  case 4:
                  {
                    bs >> val32;
                    colValue = val32;
                    break;
                  }

                  default:
                  {
                    bs >> val64;
                    colValue = val64;
                    break;
                  }
                }

                break;

              case execplan::CalpontSystemCatalog::UDECIMAL:

                // UDECIMAL numbers may not be negative
                if (colType.colWidth == 1)
                {
                  bs >> val8;

                  // FIXME: IDK what would it mean if valN are unsigned
                  if (utils::is_negative(val8) && val8 != joblist::TINYINTEMPTYROW &&
                      val8 != joblist::TINYINTNULL)
                  {
                    val8 = 0;
                    pushWarning = true;
                  }

                  colValue = val8;
                }
                else if (colType.colWidth == 2)
                {
                  bs >> val16;

                  if (utils::is_negative(val16) && val16 != joblist::SMALLINTEMPTYROW &&
                      val16 != joblist::SMALLINTNULL)
                  {
                    val16 = 0;
                    pushWarning = true;
                  }

                  colValue = val16;
                }
                else if (colType.colWidth == 4)
                {
                  bs >> val32;

                  if (utils::is_negative(val32) && val32 != joblist::INTEMPTYROW && val32 != joblist::INTNULL)
                  {
                    val32 = 0;
                    pushWarning = true;
                  }

                  colValue = val32;
                }
                else if (colType.colWidth == 8)
                {
                  bs >> val64;

                  if (utils::is_negative(val64) && val64 != joblist::BIGINTEMPTYROW &&
                      val64 != joblist::BIGINTNULL)
                  {
                    val64 = 0;
                    pushWarning = true;
                  }

                  colValue = val64;
                }

                break;

              case execplan::CalpontSystemCatalog::DOUBLE:
                bs >> val64;
                colValue = val64;
                break;

              case execplan::CalpontSystemCatalog::UDOUBLE:
                bs >> val64;
                memcpy(&valD, &val64, 8);

                if (valD < 0.0 && valD != static_cast<double>(joblist::DOUBLEEMPTYROW) &&
                    valD != static_cast<double>(joblist::DOUBLENULL))
                {
                  valD = 0.0;
                  pushWarning = true;
                }

                colValue = val64;
                break;

              case execplan::CalpontSystemCatalog::FLOAT:
                bs >> val32;
                colValue = val32;
                break;

              case execplan::CalpontSystemCatalog::UFLOAT:
                bs >> val32;
                memcpy(&valF, &val32, 4);

                if (valF < 0.0 && valF != static_cast<float>(joblist::FLOATEMPTYROW) &&
                    valF != static_cast<float>(joblist::FLOATNULL))
                {
                  valF = 0.0;
                  pushWarning = true;
                }

                colValue = val32;
                break;

              case execplan::CalpontSystemCatalog::CHAR:
              case execplan::CalpontSystemCatalog::VARCHAR:
              case execplan::CalpontSystemCatalog::TEXT:
              case execplan::CalpontSystemCatalog::BLOB:
                bs >> valStr;

                if (!valStr.isNull()) // null values do not require any padding or truncation.
                {
                  if (valStr.length() > (unsigned int)colType.colWidth)
                  {
                    valStr = NullString(valStr.unsafeStringRef().substr(0, colType.colWidth));
                    pushWarning = true;
                  }
                  else
                  {
                    if ((unsigned int)colType.colWidth > valStr.length())
                    {
                      // Pad null character to the string
                      valStr.resize(colType.colWidth, 0);
                    }
                  }
                }

                // FIXME: colValue is uint64_t (8 bytes)
                memcpy(&colValue, valStr.str(), valStr.length());
                break;

              default:
                rc = 1;
                err = IDBErrorInfo::instance()->errorMsg(ERR_DATATYPE_NOT_SUPPORT);
                break;
            }

            // check if autoincrement column and value is 0 or null
            if (colType.autoincrement && (isNULL || valZero))
            {
              ostringstream oss;
              oss << nextVal++;
              isNULL = false;

              try
              {
                nextValNeeded++;
                bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);

                if (!reserved)
                {
                  err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
                  rc = 1;
                  return rc;
                }
              }
              catch (std::exception& ex)
              {
                err = ex.what();
                rc = 1;
                return rc;
              }

              colValue = nextVal;
            }

            if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
            {
              if (isNULL && colType.defaultValue.isNull())  // error out
              {
                Message::Args args;
                args.add(tableColName.column);
                err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
                rc = 1;
                return rc;
              }
              else if (isNULL && !(colType.defaultValue.isNull()))
              {
                memcpy(&colValue, colType.defaultValue.str(), colType.defaultValue.length());
                isNULL = false;
              }
            }

            //@Bug 1806
            if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
            {
              return rc;
            }

            if (pushWarning && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
              rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;

            colValuesList.push_back(colValue);
            //@Bug 2515. Only pass string values to write engine
            dicStrings.push_back(valStr);
          }

          dicStringList.push_back(dicStrings);
        }

        if (pushWarning)
        {
          colNames.push_back(tableColName.column);
          isWarningSet = true;
        }
      }
    }
    catch (std::exception& ex)
    {
      err = ex.what();
      rc = 1;
      return rc;
    }
  }

  // call the write engine to write the rows
  int error = NO_ERROR;

  // fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
  // cout << "Batch inserting a row with transaction id " << txnid.id << endl;
  if (colValuesList.size() > 0)
  {
    if (NO_ERROR !=
        (error = fWEWrapper.insertColumnRecsBinary(
             txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, dbRootExtTrackerVec, 0,
             bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
    {
      if (error == ERR_BRM_DEAD_LOCK)
      {
        rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
        WErrorCodes ec;
        err = ec.errorString(error);
      }
      else if (error == ERR_BRM_VB_OVERFLOW)
      {
        rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
        err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
      }
      else
      {
        rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
        WErrorCodes ec;
        err = ec.errorString(error);
      }
    }
  }

  if (fIsFirstBatchPm && isAutocommitOn)
  {
    // fWEWrapper.writeVBEnd(txnid.id, rangeList);
    fIsFirstBatchPm = false;
  }
  else if (fIsFirstBatchPm)
  {
    fIsFirstBatchPm = false;
  }

  if (isWarningSet && (rc == NO_ERROR))
  {
    rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
    // cout << "Got warning" << endl;
    Message::Args args;
    string cols = "'" + colNames[0] + "'";

    for (unsigned i = 1; i < colNames.size(); i++)
    {
      cols = cols + ", " + "'" + colNames[i] + "'";
    }

    args.add(cols);
    err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
  }

  // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
  CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
  CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
  return rc;
}

uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err)
{
  uint8_t rc = 0;
  // need to commit the versioned blocks, set hwm, update casual partition, send back to DMLProc to set them
  // cout << " in commiting autocommit on batch insert " << endl;
  uint32_t tmp32, tableOid, sessionId;
  int txnID;

  bs >> tmp32;
  txnID = tmp32;
  bs >> tmp32;
  tableOid = tmp32;
  bs >> tmp32;
  sessionId = tmp32;

  BRM::DBRM dbrm;

  std::vector<BRM::BulkSetHWMArg> setHWMArgs;
  TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
  ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
  ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
  ColExtsInfo::iterator aIt;
  BulkSetHWMArg aArg;
  std::vector<BRM::FileInfo> files;
  BRM::FileInfo aFile;

  while (it != colsExtsInfoMap.end())
  {
    aIt = (it->second).begin();
    aArg.oid = it->first;
    aFile.oid = it->first;

    // cout << "OID:" << aArg.oid;
    while (aIt != (it->second).end())
    {
      aArg.partNum = aIt->partNum;
      aArg.segNum = aIt->segNum;
      aArg.hwm = aIt->hwm;

      if (!aIt->isDict)
        setHWMArgs.push_back(aArg);

      aFile.partitionNum = aIt->partNum;
      aFile.dbRoot = aIt->dbRoot;
      aFile.segmentNum = aIt->segNum;
      aFile.compType = aIt->compType;
      // cout <<"Added to files oid:dbroot:part:seg:compType = " <<
      // aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
      //<<":"<<aFile.compType <<endl;
      files.push_back(aFile);
      aIt++;
    }

    it++;
  }

  bs.restart();
  // cout << " serialized setHWMArgs size = " << setHWMArgs.size() << endl;
  serializeInlineVector(bs, setHWMArgs);

  // flush files
  // cout << "flush files when autocommit on" << endl;
  fWEWrapper.setIsInsert(true);
  fWEWrapper.setBulkFlag(true);

  fWEWrapper.setTransId(txnID);

  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);

  fIsFirstBatchPm = true;

  if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
    cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());

  TableMetaData::removeTableMetaData(tableOid);

  // MCOL-1160 For API bulk insert flush the PrimProc cached dictionary
  // blocks tounched
  std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
  mapIter = fWEWrapper.getDictMap().find(txnID);

  if (mapIter != fWEWrapper.getDictMap().end())
  {
    std::set<BRM::LBID_t>::iterator lbidIter;
    std::vector<BRM::LBID_t> dictFlushBlks;
    cerr << "API Flushing blocks: ";
    for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++)
    {
      cerr << *lbidIter << ", ";
      dictFlushBlks.push_back((*lbidIter));
    }
    cerr << endl;
    cacheutils::flushPrimProcAllverBlocks(dictFlushBlks);
    fWEWrapper.getDictMap().erase(txnID);
  }

  // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
  CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
  CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
  return rc;
}

uint8_t WE_DMLCommandProc::processBatchInsertHwm(messageqcpp::ByteStream& bs, std::string& err)
{
  uint8_t tmp8, rc = 0;
  err.clear();
  // set hwm for autocommit off
  uint32_t tmp32, tableOid;
  int txnID;
  bool isAutoCommitOn;

  bs >> tmp32;
  txnID = tmp32;
  bs >> tmp8;
  isAutoCommitOn = (tmp8 != 0);
  bs >> tmp32;
  tableOid = tmp32;
  bs >> tmp8;
  // cout << "processBatchInsertHwm: tranid:isAutoCommitOn = " <<txnID <<":"<< (int)isAutoCommitOn << endl;
  std::vector<BRM::FileInfo> files;
  std::vector<BRM::OID_t> oidsToFlush;

  BRM::FileInfo aFile;
  // BRM::FileInfo curFile;
  TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
  ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
  ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
  ColExtsInfo::iterator aIt;
  CalpontSystemCatalog::RIDList ridList;
  CalpontSystemCatalog::ROPair roPair;

  //@Bug 5996. Validate hwm before set them
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(0);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);

  CalpontSystemCatalog::OID tableAUXColOid;

  try
  {
    CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid);
    ridList = systemCatalogPtr->columnRIDs(tableName);
    tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName);
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
    TableMetaData::removeTableMetaData(tableOid);

    fIsFirstBatchPm = true;
    // cout << "flush files when autocommit off" << endl;
    fWEWrapper.setIsInsert(true);
    fWEWrapper.setBulkFlag(true);
    return rc;
  }

  // MCOL-5021 Valid AUX column OID for a table is > 3000
  // Tables that were created before this feature was added will have
  // tableAUXColOid = 0
  bool hasAUXCol = (tableAUXColOid > 3000);

  if (hasAUXCol)
  {
    CalpontSystemCatalog::ROPair auxRoPair;
    auxRoPair.rid = ridList.back().rid + 1;
    auxRoPair.objnum = tableAUXColOid;
    ridList.push_back(auxRoPair);
  }

  std::vector<DBRootExtentInfo> colDBRootExtentInfo;
  DBRootExtentInfo aExtentInfo;
  std::vector<DBRootExtentInfo> auxColDBRootExtentInfo;

  while (it != colsExtsInfoMap.end())
  {
    aIt = (it->second).begin();
    aFile.oid = it->first;
    oidsToFlush.push_back(aFile.oid);
    roPair.objnum = aFile.oid;
    aExtentInfo.fPartition = 0;
    aExtentInfo.fDbRoot = 0;
    aExtentInfo.fSegment = 0;
    aExtentInfo.fLocalHwm = 0;
    bool isDict = false;

    while (aIt != (it->second).end())
    {
      aFile.partitionNum = aIt->partNum;
      aFile.dbRoot = aIt->dbRoot;
      aFile.segmentNum = aIt->segNum;
      aFile.compType = aIt->compType;
      files.push_back(aFile);

      if (!aIt->isDict)
      {
        if ((aIt->partNum > aExtentInfo.fPartition) ||
            ((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum > aExtentInfo.fSegment)) ||
            ((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum == aExtentInfo.fSegment) &&
             (aIt->segNum > aExtentInfo.fLocalHwm)))
        {
          aExtentInfo.fPartition = aIt->partNum;
          aExtentInfo.fDbRoot = aIt->dbRoot;
          aExtentInfo.fSegment = aIt->segNum;
          aExtentInfo.fLocalHwm = aIt->hwm;
        }
      }
      else
      {
        isDict = true;
      }

      aIt++;
    }

    if (!isDict)
    {
      if (hasAUXCol && (((uint32_t)tableAUXColOid) == it->first))
      {
        auxColDBRootExtentInfo.push_back(aExtentInfo);
      }
      else
      {
        colDBRootExtentInfo.push_back(aExtentInfo);
      }
    }

    it++;
  }

  if (hasAUXCol)
  {
    colDBRootExtentInfo.insert(colDBRootExtentInfo.end(), auxColDBRootExtentInfo.begin(),
                               auxColDBRootExtentInfo.end());
  }

  rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Ending",
                          hasAUXCol);

  if (rc != 0)
  {
    WErrorCodes ec;
    err = ec.errorString(rc);
    err += " Check err.log for detailed information.";
    TableMetaData::removeTableMetaData(tableOid);

    fIsFirstBatchPm = true;
    fWEWrapper.setIsInsert(true);
    fWEWrapper.setBulkFlag(true);
    rc = 1;
    return rc;
  }

  try
  {
    if (isAutoCommitOn)
    {
      bs.restart();

      if (fWEWrapper.getIsInsert())
      {
        // @bug5333, up to here, rc == 0, but flushchunk may fail.
        rc = processBatchInsertHwmFlushChunks(tableOid, txnID, files, oidsToFlush, err);
      }

      if (tmp8 != 0)
        TableMetaData::removeTableMetaData(tableOid);

      return rc;  // will set hwm with version commit.
    }
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  // Handle case where isAutoCommitOn is false
  BRM::DBRM dbrm;
  // cout << " In processBatchInsertHwm setting hwm" << endl;
  std::vector<BRM::BulkSetHWMArg> setHWMArgs;
  it = colsExtsInfoMap.begin();
  BulkSetHWMArg aArg;

  while (it != colsExtsInfoMap.end())
  {
    aIt = (it->second).begin();
    aArg.oid = it->first;

    // cout << "for oid " << aArg.oid << endl;
    while (aIt != (it->second).end())
    {
      aArg.partNum = aIt->partNum;
      aArg.segNum = aIt->segNum;
      aArg.hwm = aIt->hwm;

      //@Bug 6029 dictionary store files already set hwm.
      if (!aIt->isDict)
        setHWMArgs.push_back(aArg);

      aIt++;
    }

    it++;
  }

  TableMetaData::removeTableMetaData(tableOid);

  fIsFirstBatchPm = true;
  // cout << "flush files when autocommit off" << endl;
  fWEWrapper.setIsInsert(true);
  fWEWrapper.setBulkFlag(true);

  rc = processBatchInsertHwmFlushChunks(tableOid, txnID, files, oidsToFlush, err);
  bs.restart();

  try
  {
    serializeInlineVector(bs, setHWMArgs);
  }
  catch (exception& ex)
  {
    // Append to errmsg in case we already have an error
    if (err.length() > 0)
      err += "; ";

    err += ex.what();
    rc = 1;
    return rc;
  }

  // cout << "flush is called for transaction " << txnID << endl;

  return rc;
}

//------------------------------------------------------------------------------
// Flush chunks for the specified table and transaction.
// Also confirms changes to DB files (for hdfs).
// files vector represents list of files to be purged from PrimProc cache.
// oid2ToFlush  represents list of oids  to be flushed from PrimProc cache.
// Afterwords, the following attributes are reset as follows:
//   fWEWrapper.setIsInsert(false);
//   fWEWrapper.setBulkFlag(false);
//   fIsFirstBatchPm = true;
// returns 0 for success; returns 1 if error occurs
//------------------------------------------------------------------------------
uint8_t WE_DMLCommandProc::processBatchInsertHwmFlushChunks(uint32_t tblOid, int txnID,
                                                            const std::vector<BRM::FileInfo>& files,
                                                            const std::vector<BRM::OID_t>& oidsToFlush,
                                                            std::string& err)
{
  uint8_t rc = 0;
  std::map<uint32_t, uint32_t> oids;
  CalpontSystemCatalog::TableName aTableName;
  CalpontSystemCatalog::RIDList ridList;
  CalpontSystemCatalog::DictOIDList dictOids;
  CalpontSystemCatalog::OID tableAUXColOid;

  try
  {
    boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
        CalpontSystemCatalog::makeCalpontSystemCatalog(txnID);
    aTableName = systemCatalogPtr->tableName(tblOid);
    ridList = systemCatalogPtr->columnRIDs(aTableName, true);
    dictOids = systemCatalogPtr->dictOIDs(aTableName);
    tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(aTableName);
  }
  catch (exception& ex)
  {
    std::ostringstream ossErr;
    ossErr << "System Catalog error for table OID " << tblOid;

    // Include tbl name in msg unless exception occurred before we got it
    if (aTableName.table.length() > 0)
      ossErr << '(' << aTableName << ')';

    ossErr << "; " << ex.what();
    err = ossErr.str();
    rc = 1;
    return rc;
  }

  // MCOL-5021 Valid AUX column OID for a table is > 3000
  // Tables that were created before this feature was added will have
  // tableAUXColOid = 0
  if (tableAUXColOid > 3000)
  {
    CalpontSystemCatalog::ROPair auxRoPair;
    auxRoPair.rid = ridList.back().rid + 1;
    auxRoPair.objnum = tableAUXColOid;
    ridList.push_back(auxRoPair);
  }

  for (unsigned i = 0; i < ridList.size(); i++)
  {
    oids[ridList[i].objnum] = ridList[i].objnum;
  }

  for (unsigned i = 0; i < dictOids.size(); i++)
  {
    oids[dictOids[i].dictOID] = dictOids[i].dictOID;
  }

  fWEWrapper.setTransId(txnID);

  // @bug5333, up to here, rc == 0, but flushchunk may fail.
  rc = fWEWrapper.flushChunks(0, oids);

  if (rc == NO_ERROR)
  {
    // Confirm changes to db files "only" if no error up to this point
    if (idbdatafile::IDBPolicy::useHdfs())
    {
      std::string eMsg;
      ConfirmHdfsDbFile confirmHdfs;
      int confirmDbRc = confirmHdfs.confirmDbFileListFromMetaFile(tblOid, eMsg);

      if (confirmDbRc == NO_ERROR)
      {
        int endDbRc = confirmHdfs.endDbFileListFromMetaFile(tblOid, true, eMsg);

        if (endDbRc != NO_ERROR)
        {
          // Might want to log this error, but don't think we
          // need to report as fatal, as all changes were confirmed.
        }

        if (files.size() > 0)
          cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());

        cacheutils::flushOIDsFromCache(oidsToFlush);
      }
      else
      {
        ostringstream ossErr;
        ossErr << "Error confirming changes to table " << aTableName << "; " << eMsg;
        err = ossErr.str();
        rc = 1;  // reset to 1
      }
    }
  }
  else  // flushChunks error
  {
    WErrorCodes ec;
    std::ostringstream ossErr;
    ossErr << "Error flushing chunks for table " << aTableName << "; " << ec.errorString(rc);
    err = ossErr.str();
    rc = 1;  // reset to 1
  }

  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fIsFirstBatchPm = true;

  return rc;
}

uint8_t WE_DMLCommandProc::commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err)
{
  uint8_t rc = 0;
  // commit all versioned blocks, set hwm, update casual partition
  return rc;
}

uint8_t WE_DMLCommandProc::rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err)
{
  uint8_t rc = 0;
  uint32_t tmp32, tableOid, sessionID;
  uint64_t lockID;
  bs >> sessionID;
  bs >> lockID;
  bs >> tmp32;
  tableOid = tmp32;
  // Bulkrollback
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  CalpontSystemCatalog::TableName aTableName;

  try
  {
    aTableName = systemCatalogPtr->tableName(tableOid);
  }
  catch (...)
  {
    err = std::string("No such table for oid ") + std::to_string(tableOid);
    rc = 1;
    return rc;
  }

  string table = aTableName.schema + "." + aTableName.table;
  string applName("BatchInsert");
  rc = fWEWrapper.bulkRollback(tableOid, lockID, table, applName, false, err);
  fIsFirstBatchPm = true;
  TableMetaData::removeTableMetaData(tableOid);
  return rc;
}

uint8_t WE_DMLCommandProc::rollbackBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err)
{
  uint8_t rc = 0;
  // Rollbacked all versioned blocks
  return rc;
}

uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, std::string& err,
                                         ByteStream::quadbyte& PMId, uint64_t& blocksChanged)
{
  uint8_t rc = 0;
  uint32_t tmp32, sessionID;
  TxnID txnId;
  bs >> PMId;
  bs >> tmp32;
  txnId = tmp32;
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.setTransId(txnId);

  if (!rowGroups[txnId])  // meta data
  {
    rowGroups[txnId] = new rowgroup::RowGroup();
    rowGroups[txnId]->deserialize(bs);
    uint8_t pkgType;
    bs >> pkgType;
    cpackages[txnId].read(bs);

    rc = fWEWrapper.startTransaction(txnId);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      err = ec.errorString(rc);
    }

    return rc;
  }

  bool pushWarning = false;
  rowgroup::RGData rgData;
  rgData.deserialize(bs);
  rowGroups[txnId]->setData(&rgData);
  // rowGroups[txnId]->setData(const_cast<uint8_t*>(bs.buf()));
  // get rows and values
  rowgroup::Row row;
  rowGroups[txnId]->initRow(&row);
  utils::NullString value;
  uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount();
  uint32_t columnsSelected = rowGroups[txnId]->getColumnCount();
  std::vector<execplan::CalpontSystemCatalog::ColDataType> fetchColTypes = rowGroups[txnId]->getColTypes();
  std::vector<uint32_t> fetchColScales = rowGroups[txnId]->getScale();
  std::vector<uint32_t> fetchColColwidths;

  for (uint32_t i = 0; i < columnsSelected; i++)
  {
    fetchColColwidths.push_back(rowGroups[txnId]->getColumnWidth(i));
  }

  WriteEngine::ColTupleList aColList;
  WriteEngine::ColTuple colTuple;
  WriteEngine::ColStructList colStructList;
  WriteEngine::ColStruct colStruct;
  WriteEngine::ColValueList colValueList;
  WriteEngine::RIDList rowIDLists;
  WriteEngine::CSCTypesList cscColTypeList;

  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryStruct dctnryStruct;
  WriteEngine::DctnryValueList dctnryValueList;

  CalpontSystemCatalog::TableName tableName;
  CalpontSystemCatalog::TableColName tableColName;
  DMLTable* tablePtr = cpackages[txnId].get_Table();
  RowList rows = tablePtr->get_RowList();
  dmlpackage::ColumnList columnsUpdated = rows[0]->get_ColumnList();
  tableColName.table = tableName.table = tablePtr->get_TableName();
  tableColName.schema = tableName.schema = tablePtr->get_SchemaName();
  tableColName.column = columnsUpdated[0]->get_Name();

  sessionID = cpackages[txnId].get_SessionID();

  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  CalpontSystemCatalog::OID oid = 0;
  CalpontSystemCatalog::ROPair tableRO;

  long timeZone = cpackages[txnId].get_TimeZone();

  try
  {
    tableRO = systemCatalogPtr->tableRID(tableName);
    oid = systemCatalogPtr->lookupOID(tableColName);
  }
  catch (std::exception& ex)
  {
    rc = 1;
    ostringstream oss;
    oss << "lookupOID got exception " << ex.what() << " with column " << tableColName.schema
        << "." + tableColName.table << "." << tableColName.column;
    err = oss.str();
  }
  catch (...)
  {
    rc = 1;
    ostringstream oss;
    oss << "lookupOID got unknown exception with column " << tableColName.schema << "." << tableColName.table
        << "." << tableColName.column;
    err = oss.str();
  }

  if (rc != 0)
    return rc;

  rowGroups[txnId]->getRow(0, &row);
  CalpontSystemCatalog::RID rid = row.getRid();
  uint16_t dbRoot, segment, blockNum;
  uint32_t partition;
  uint8_t extentNum;
  // Get the file information from rowgroup
  dbRoot = rowGroups[txnId]->getDBRoot();
  rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
  colStruct.fColPartition = partition;
  colStruct.fColSegment = segment;
  colStruct.fColDbRoot = dbRoot;
  dctnryStruct.fColPartition = partition;
  dctnryStruct.fColSegment = segment;
  dctnryStruct.fColDbRoot = dbRoot;
  TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableRO.objnum);
  // Build to be updated column structure and values
  int error = 0;
  unsigned fetchColPos = 0;
  bool ridsFetched = false;
  bool isNull = false;
  boost::any datavalue;
  int64_t intColVal = 0;
  // timer.start("fetch values");
  std::vector<string> colNames;

  // for query stats
  boost::scoped_array<CalpontSystemCatalog::ColType> colTypes(
      new CalpontSystemCatalog::ColType[columnsUpdated.size()]);
  boost::scoped_array<int> preBlkNums(new int[columnsUpdated.size()]);
  boost::scoped_array<OID> oids(new OID[columnsUpdated.size()]);

  BRMWrapper::setUseVb(true);
  for (unsigned int j = 0; j < columnsUpdated.size(); j++)
  {
    // timer.start("lookupsyscat");
    tableColName.column = columnsUpdated[j]->get_Name();

    try
    {
      oids[j] = systemCatalogPtr->lookupOID(tableColName);
      colTypes[j] = systemCatalogPtr->colType(oids[j]);
    }
    catch (std::exception& ex)
    {
      rc = 1;
      ostringstream oss;
      oss << "colType got exception " << ex.what() << " with column oid " << oid;
      err = oss.str();
    }
    catch (...)
    {
      rc = 1;
      ostringstream oss;
      oss << "colType got unknown exception with column oid " << oid;
      err = oss.str();
    }

    if (rc != 0)
      return rc;

    preBlkNums[j] = -1;
  }

  for (unsigned int j = 0; j < columnsUpdated.size(); j++)
  {
    WriteEngine::ColTupleList colTupleList;
    CalpontSystemCatalog::ColType colType = colTypes[j];
    oid = oids[j];
    colStruct.dataOid = oid;
    colStruct.colDataType = colType.colDataType;
    colStruct.tokenFlag = false;
    colStruct.fCompressionType = colType.compressionType;
    tableColName.column = columnsUpdated[j]->get_Name();

    if (!ridsFetched)
    {
      // querystats
      uint64_t relativeRID = 0;

      for (unsigned i = 0; i < rowsThisRowgroup; i++)
      {
        rowGroups[txnId]->getRow(i, &row);
        rid = row.getRid();
        relativeRID = rid - rowGroups[txnId]->getBaseRid();
        rid = relativeRID;
        convertToRelativeRid(rid, extentNum, blockNum);
        rowIDLists.push_back(rid);

        uint32_t colWidth = colTypes[j].colWidth;

        if (colWidth > 8 && !(colTypes[j].colDataType == CalpontSystemCatalog::DECIMAL ||
                              colTypes[j].colDataType == CalpontSystemCatalog::UDECIMAL))
        {
          colWidth = 8;
        }
        else if (colWidth >= datatypes::MAXDECIMALWIDTH)
        {
          colWidth = datatypes::MAXDECIMALWIDTH;
        }

        int rrid = (int)relativeRID / (BYTE_PER_BLOCK / colWidth);
        // populate stats.blocksChanged
        if (rrid > preBlkNums[j])
        {
          preBlkNums[j] = rrid;
          blocksChanged++;
        }
      }

      ridsFetched = true;
    }

    bool pushWarn = false;
    bool nameNeeded = false;

    if (isDictCol(colType))
    {
      colStruct.colWidth = 8;
      colStruct.tokenFlag = true;
      dctnryStruct.dctnryOid = colType.ddn.dictOID;
      dctnryStruct.columnOid = colStruct.dataOid;
      dctnryStruct.fCompressionType = colType.compressionType;
      dctnryStruct.fCharsetNumber = colType.charsetNumber;
      dctnryStruct.colWidth = colType.colWidth;

      if (NO_ERROR != (error = fWEWrapper.openDctnry(txnId, dctnryStruct, false)))  // @bug 5572 HDFS tmp file
      {
        WErrorCodes ec;
        err = ec.errorString(error);
        rc = error;
        return rc;
      }

      ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStruct.dctnryOid);
      ColExtsInfo::iterator it = aColExtsInfo.begin();

      while (it != aColExtsInfo.end())
      {
        if ((it->dbRoot == dctnryStruct.fColDbRoot) && (it->partNum == dctnryStruct.fColPartition) &&
            (it->segNum == dctnryStruct.fColSegment))
          break;

        it++;
      }

      if (it == aColExtsInfo.end())  // add this one to the list
      {
        ColExtInfo aExt;
        aExt.dbRoot = dctnryStruct.fColDbRoot;
        aExt.partNum = dctnryStruct.fColPartition;
        aExt.segNum = dctnryStruct.fColSegment;
        aExt.compType = dctnryStruct.fCompressionType;
        aExt.isDict = true;
        aColExtsInfo.push_back(aExt);
      }

      aTableMetaData->setColExtsInfo(dctnryStruct.dctnryOid, aColExtsInfo);

      if (columnsUpdated[j]->get_isFromCol())
      {
        for (unsigned i = 0; i < rowsThisRowgroup; i++)
        {
          value.dropString();
          rowGroups[txnId]->getRow(i, &row);

          if (row.isNullValue(fetchColPos))
          {
            if ((colType.defaultValue.length() <= 0) &&
                (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
            {
              rc = 1;
              Message::Args args;
              args.add(tableColName.column);
              err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
              return rc;
            }
            else if (colType.defaultValue.length() > 0)
            {
              value = colType.defaultValue;

              if (value.length() > (unsigned int)colType.colWidth)
              {
                value.assign(value.safeString("").substr(0, colType.colWidth));
                pushWarn = true;

                if (!pushWarning)
                {
                  pushWarning = true;
                }

                if (pushWarn)
                  nameNeeded = true;
              }

              WriteEngine::DctnryTuple dctTuple;
              dctTuple.sigValue = (unsigned char*)value.str();
              dctTuple.sigSize = value.length();
              dctTuple.isNull = false;

              error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);

              if (error != NO_ERROR)
              {
                fWEWrapper.closeDctnry(txnId, colType.compressionType);
                return false;
              }

              colTuple.data = dctTuple.token;
              colTupleList.push_back(colTuple);
            }
            else
            {
              WriteEngine::Token nullToken;
              colTuple.data = nullToken;
              colTupleList.push_back(colTuple);
            }

            continue;
          }

          switch (fetchColTypes[fetchColPos])
          {
            case CalpontSystemCatalog::DATE:
            {
              intColVal = row.getUintField<4>(fetchColPos);
              value.assign(DataConvert::dateToString(intColVal));
              break;
            }

            case CalpontSystemCatalog::DATETIME:
            {
              intColVal = row.getUintField<8>(fetchColPos);
              value.assign(DataConvert::datetimeToString(intColVal, colType.precision));
              break;
            }

            case CalpontSystemCatalog::TIMESTAMP:
            {
              intColVal = row.getUintField<8>(fetchColPos);
              value.assign(DataConvert::timestampToString(intColVal, timeZone, colType.precision));
              break;
            }

            case CalpontSystemCatalog::TIME:
            {
              intColVal = row.getIntField<8>(fetchColPos);
              value.assign(DataConvert::timeToString(intColVal, colType.precision));
              break;
            }

            case CalpontSystemCatalog::CHAR:
            case CalpontSystemCatalog::VARCHAR:
            {
              value = row.getStringField(fetchColPos);
              if (!value.isNull())
              {
                unsigned i = strlen(value.str());
                value.assign(value.unsafeStringRef().substr(0, i));
              }
              break;
            }

            case CalpontSystemCatalog::VARBINARY:
            case CalpontSystemCatalog::BLOB:
            case CalpontSystemCatalog::TEXT:
            {
              value.assign(row.getVarBinaryField(fetchColPos), row.getVarBinaryLength(fetchColPos));
              break;
            }

            case CalpontSystemCatalog::DECIMAL:
            case CalpontSystemCatalog::UDECIMAL:
            {
              if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH)
              {
                datatypes::Decimal dec(row.getTSInt128Field(fetchColPos), fetchColScales[fetchColPos],
                                       rowGroups[txnId]->getPrecision()[fetchColPos]);
                value.assign(dec.toString(true));
                break;
              }
            }
              /* fall through */

            case CalpontSystemCatalog::BIGINT:
            case CalpontSystemCatalog::UBIGINT:
            case CalpontSystemCatalog::INT:
            case CalpontSystemCatalog::UINT:
            case CalpontSystemCatalog::MEDINT:
            case CalpontSystemCatalog::UMEDINT:
            case CalpontSystemCatalog::SMALLINT:
            case CalpontSystemCatalog::USMALLINT:
            case CalpontSystemCatalog::TINYINT:
            case CalpontSystemCatalog::UTINYINT:
            {
              {
                intColVal = row.getIntField(fetchColPos);

                if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDECIMAL && intColVal < 0)
                {
                  intColVal = 0;
                }

                if (fetchColScales[fetchColPos] <= 0)
                {
                  ostringstream os;

                  if (isUnsigned(fetchColTypes[fetchColPos]))
                    os << static_cast<uint64_t>(intColVal);
                  else
                    os << intColVal;

                  value.assign(os.str());
                }
                else
                {
                  datatypes::Decimal dec(intColVal, fetchColScales[fetchColPos],
                                         rowGroups[txnId]->getPrecision()[fetchColPos]);
                  value.assign(dec.toString());
                }
              }
              break;
            }

            // In this case, we're trying to load a double output column with float data. This is the
            // case when you do sum(floatcol), e.g.
            case CalpontSystemCatalog::FLOAT:
            case CalpontSystemCatalog::UFLOAT:
            {
              float dl = row.getFloatField(fetchColPos);

              if (dl == std::numeric_limits<float>::infinity())
                continue;

              if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0)
              {
                dl = 0.0;
              }

              ostringstream os;
              //@Bug 3350 fix the precision.
              os << setprecision(7) << dl;
              value.assign(os.str());
              break;
            }

            case CalpontSystemCatalog::DOUBLE:
            case CalpontSystemCatalog::UDOUBLE:
            {
              double dl = row.getDoubleField(fetchColPos);

              if (dl == std::numeric_limits<double>::infinity())
                continue;

              if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0)
              {
                dl = 0.0;
              }

              ostringstream os;
              //@Bug 3350 fix the precision.
              os << setprecision(16) << dl;
              value.assign(os.str());
              break;
            }

            case CalpontSystemCatalog::LONGDOUBLE:
            {
              long double dll = row.getLongDoubleField(fetchColPos);

              if (dll == std::numeric_limits<long double>::infinity())
                continue;

              ostringstream os;
              //@Bug 3350 fix the precision.
              os << setprecision(19) << dll;
              value.assign(os.str());
              break;
            }

            default:  // treat as int64
            {
              ostringstream os;
              intColVal = row.getUintField<8>(fetchColPos);
              os << intColVal;
              value.assign(os.str());
              break;
            }
          }

          uint32_t funcScale = columnsUpdated[j]->get_funcScale();

          if (funcScale != 0)
          {
            string str = value.safeString("");
            string::size_type pos = str.find_first_of(".");  // decimal point

            if (pos >= str.length())
              str.insert(str.length(), ".");

            // padding 0 if needed
            pos = str.find_first_of(".");
            uint32_t digitsAfterPoint = str.length() - pos - 1;

            if (digitsAfterPoint < funcScale)
            {
              for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++)
                str += "0";
            }

            value.assign(str);
          }

          // check data length
          // trim the string if needed
          if (value.length() > (unsigned int)colType.colWidth)
          {
            value.assign(value.unsafeStringRef().substr(0, colType.colWidth));

            if (!pushWarn)
              pushWarn = true;

            if (!pushWarning)
              pushWarning = true;

            if (pushWarn)
              nameNeeded = true;
          }

          WriteEngine::DctnryTuple dctTuple;
          dctTuple.sigValue = (unsigned char*)value.str();
          dctTuple.sigSize = value.length();
          dctTuple.isNull = false;

          error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);

          if (error != NO_ERROR)
          {
            fWEWrapper.closeDctnry(txnId, colType.compressionType);
            rc = error;
            WErrorCodes ec;
            err = ec.errorString(error);
            return rc;
          }

          colTuple.data = dctTuple.token;
          colTupleList.push_back(colTuple);
        }

        if (colType.compressionType == 0)
          fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
        else
          fWEWrapper.closeDctnry(txnId, colType.compressionType, false);

        fetchColPos++;
      }
      else  // constant
      {
        if (columnsUpdated[j]->get_isnull())
        {
          if ((colType.defaultValue.length() <= 0) &&
              (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
          {
            rc = 1;
            Message::Args args;
            args.add(tableColName.column);
            err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
            return rc;
          }
          else if (colType.defaultValue.length() > 0)
          {
            value = colType.defaultValue;

            if (value.length() > (unsigned int)colType.colWidth)
            {
              value.assign(value.safeString("").substr(0, colType.colWidth));
              pushWarn = true;

              if (!pushWarning)
              {
                pushWarning = true;
              }

              if (pushWarn)
                nameNeeded = true;
            }

            WriteEngine::DctnryTuple dctTuple;
            dctTuple.sigValue = (unsigned char*)value.str();
            dctTuple.sigSize = value.length();
            dctTuple.isNull = false;
            error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);

            if (error != NO_ERROR)
            {
              fWEWrapper.closeDctnry(txnId, colType.compressionType);
              rc = error;
              WErrorCodes ec;
              err = ec.errorString(error);
              return rc;
            }

            colTuple.data = dctTuple.token;

            if (colType.compressionType == 0)
              fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
            else
              fWEWrapper.closeDctnry(txnId, colType.compressionType,
                                     false);  // Constant only need to tokenize once.
          }
          else
          {
            WriteEngine::Token nullToken;
            colTuple.data = nullToken;
          }
        }
        else
        {
          idbassert(!columnsUpdated[j]->get_DataVector()[0].isNull());
          value = columnsUpdated[j]->get_DataVector()[0];

          if (value.length() > (unsigned int)colType.colWidth)
          {
            value.assign(value.unsafeStringRef().substr(0, colType.colWidth));
            pushWarn = true;

            if (!pushWarning)
            {
              pushWarning = true;
            }

            if (pushWarn)
              nameNeeded = true;
          }

          WriteEngine::DctnryTuple dctTuple;
          dctTuple.sigValue = (unsigned char*)value.str();
          dctTuple.sigSize = value.length();
          dctTuple.isNull = false;
          error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);

          if (error != NO_ERROR)
          {
            fWEWrapper.closeDctnry(txnId, colType.compressionType);
            rc = error;
            WErrorCodes ec;
            err = ec.errorString(error);
            return rc;
          }

          colTuple.data = dctTuple.token;

          if (colType.compressionType == 0)
            fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
          else
            fWEWrapper.closeDctnry(txnId, colType.compressionType,
                                   false);  // Constant only need to tokenize once.
        }

        for (unsigned row = 0; row < rowsThisRowgroup; row++)
          colTupleList.push_back(colTuple);
      }
    }
    else  // Non dictionary column
    {
      colStruct.colWidth = colType.colWidth;

      if (columnsUpdated[j]->get_isFromCol())
      {
        for (unsigned i = 0; i < rowsThisRowgroup; i++)
        {
          rowGroups[txnId]->getRow(i, &row);

          if (row.isNullValue(fetchColPos))
          {
            isNull = true;
            value.dropString();
          }
          else
          {
            isNull = false;

            switch (fetchColTypes[fetchColPos])
            {
              case CalpontSystemCatalog::DATE:
              {
                intColVal = row.getUintField<4>(fetchColPos);
                value.assign(DataConvert::dateToString(intColVal));
                break;
              }

              case CalpontSystemCatalog::DATETIME:
              {
                intColVal = row.getUintField<8>(fetchColPos);
                value.assign(DataConvert::datetimeToString(intColVal, colType.precision));
                break;
              }

              case CalpontSystemCatalog::TIMESTAMP:
              {
                intColVal = row.getUintField<8>(fetchColPos);
                value.assign(DataConvert::timestampToString(intColVal, timeZone, colType.precision));
                break;
              }

              case CalpontSystemCatalog::TIME:
              {
                intColVal = row.getIntField<8>(fetchColPos);
                value.assign(DataConvert::timeToString(intColVal, colType.precision));
                break;
              }

              case CalpontSystemCatalog::CHAR:
              case CalpontSystemCatalog::VARCHAR:
              {
                value = row.getStringField(fetchColPos);
                if (!value.isNull())
                {
                  unsigned i = strlen(value.str());
                  value.assign(value.safeString().substr(0, i)); // XXX: why???
                }
                break;
              }

              case CalpontSystemCatalog::VARBINARY:
              case CalpontSystemCatalog::BLOB:
              case CalpontSystemCatalog::TEXT:
              {
                value.assign(row.getVarBinaryField(fetchColPos), row.getVarBinaryLength(fetchColPos));
                break;
              }

              case CalpontSystemCatalog::DECIMAL:
              case CalpontSystemCatalog::UDECIMAL:
              {
                if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH)
                {
                  datatypes::Decimal dec(row.getTSInt128Field(fetchColPos), fetchColScales[fetchColPos],
                                         rowGroups[txnId]->getPrecision()[fetchColPos]);
                  value.assign(dec.toString(true));
                  break;
                }
              }
                /* fall through */

              case CalpontSystemCatalog::BIGINT:
              case CalpontSystemCatalog::UBIGINT:
              case CalpontSystemCatalog::INT:
              case CalpontSystemCatalog::UINT:
              case CalpontSystemCatalog::MEDINT:
              case CalpontSystemCatalog::UMEDINT:
              case CalpontSystemCatalog::SMALLINT:
              case CalpontSystemCatalog::USMALLINT:
              case CalpontSystemCatalog::TINYINT:
              case CalpontSystemCatalog::UTINYINT:
              {
                {
                  intColVal = row.getIntField(fetchColPos);

                  if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDECIMAL && intColVal < 0)
                  {
                    intColVal = 0;
                  }

                  if (fetchColScales[fetchColPos] <= 0)
                  {
                    ostringstream os;

                    if (isUnsigned(fetchColTypes[fetchColPos]))
                      os << static_cast<uint64_t>(intColVal);
                    else
                      os << intColVal;

                    value.assign(os.str());
                  }
                  else
                  {
                    datatypes::Decimal dec(intColVal, fetchColScales[fetchColPos],
                                           rowGroups[txnId]->getPrecision()[fetchColPos]);
                    value.assign(dec.toString());
                  }
                }
                break;
              }

              // In this case, we're trying to load a double output column with float data. This is the
              // case when you do sum(floatcol), e.g.
              case CalpontSystemCatalog::FLOAT:
              case CalpontSystemCatalog::UFLOAT:
              {
                float dl = row.getFloatField(fetchColPos);

                if (dl == std::numeric_limits<float>::infinity())
                  continue;

                if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0)
                {
                  dl = 0.0;
                }

                ostringstream os;
                //@Bug 3350 fix the precision.
                os << setprecision(7) << dl;
                value.assign(os.str());
                break;
              }

              case CalpontSystemCatalog::DOUBLE:
              case CalpontSystemCatalog::UDOUBLE:
              {
                double dl = row.getDoubleField(fetchColPos);

                if (dl == std::numeric_limits<double>::infinity())
                  continue;

                if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0)
                {
                  dl = 0.0;
                }

                ostringstream os;
                //@Bug 3350 fix the precision.
                os << setprecision(16) << dl;
                value.assign(os.str());
                break;
              }

              case CalpontSystemCatalog::LONGDOUBLE:
              {
                long double dll = row.getLongDoubleField(fetchColPos);

                if (dll == std::numeric_limits<long double>::infinity())
                  continue;

                ostringstream os;
                //@Bug 3350 fix the precision.
                os << setprecision(19) << dll;
                value.assign(os.str());
                break;
              }

              default:  // treat as int64
              {
                ostringstream os;
                intColVal = row.getUintField<8>(fetchColPos);
                os << intColVal;
                value.assign(os.str());
                break;
              }
            }
          }

          uint32_t funcScale = columnsUpdated[j]->get_funcScale();

          if (funcScale != 0)
          {
            string str = value.safeString("");
            string::size_type pos = str.find_first_of(".");  // decimal point

            if (pos >= str.length())
              str.insert(str.length(), ".");

            // padding 0 if needed
            pos = str.find_first_of(".");
            uint32_t digitsAfterPoint = str.length() - pos - 1;

            if (digitsAfterPoint < funcScale)
            {
              for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++)
                str += "0";
            }
            value.assign(str);
          }

          // Check NOT NULL constraint and default value
          if ((isNull) && (colType.defaultValue.isNull()) &&
              (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
          {
            rc = 1;
            Message::Args args;
            args.add(tableColName.column);
            err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
            return rc;
          }
          else if ((isNull) && (!colType.defaultValue.isNull()))
          {
            isNull = false;
            bool oneWarn = false;

            try
            {
              datavalue =
                  colType.convertColumnData(colType.defaultValue.safeString(""), pushWarn, timeZone, isNull, false, false);
            }
            catch (exception&)
            {
              //@Bug 2624. Error out on conversion failure
              rc = 1;
              Message::Args args;
              args.add(string("'") + colType.defaultValue.safeString() + string("'"));
              err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
            }

            if ((pushWarn) && (!oneWarn))
              oneWarn = true;

            colTuple.data = datavalue;
            colTupleList.push_back(colTuple);

            if (oneWarn)
              pushWarn = true;

            if (!pushWarning)
            {
              pushWarning = pushWarn;
            }

            if (pushWarn)
              nameNeeded = true;
          }
          else
          {
            try
            {
              datavalue = colType.convertColumnData(value.safeString(""), pushWarn, timeZone, isNull, false, false);
            }
            catch (exception&)
            {
              //@Bug 2624. Error out on conversion failure
              rc = 1;
              Message::Args args;
              args.add(string("'") + value.safeString() + string("'"));
              err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
              return rc;
            }

            colTuple.data = datavalue;
            colTupleList.push_back(colTuple);

            if (!pushWarning)
            {
              pushWarning = pushWarn;
            }

            if (pushWarn)
              nameNeeded = true;
          }
        }

        fetchColPos++;
      }
      else  // constant column
      {
        if (columnsUpdated[j]->get_isnull())
        {
          isNull = true;
        }
        else
        {
          isNull = false;
        }

        NullString inData;
        if (!isNull)
        {
          inData = columnsUpdated[j]->get_DataVector()[0];
        }

        uint64_t nextVal = 0;

        if (colType.autoincrement)
        {
          try
          {
            nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
            fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
          }
          catch (std::exception& ex)
          {
            err = ex.what();
            rc = 1;
            return rc;
          }
        }

        if (colType.autoincrement && (isNull || (inData.safeString("").compare("0") == 0)))
        {
          // reserve nextVal
          try
          {
            bool reserved = fDbrm.getAIRange(oid, rowsThisRowgroup, &nextVal);

            if (!reserved)
            {
              err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
              rc = 1;
              return rc;
            }
          }
          catch (std::exception& ex)
          {
            err = ex.what();
            rc = 1;
            return rc;
          }

          isNull = false;
          bool oneWarn = false;

          for (unsigned row = 0; row < rowsThisRowgroup; row++)
          {
            ostringstream oss;
            oss << nextVal++;
            inData.assign(oss.str());

            try
            {
              datavalue = colType.convertColumnData(inData, pushWarn, timeZone, false, false);
            }
            catch (exception&)
            {
              //@Bug 2624. Error out on conversion failure
              rc = 1;
              Message::Args args;
              args.add(string("'") + inData.safeString() + string("'"));
              err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
            }

            if ((pushWarn) && (!oneWarn))
              oneWarn = true;

            colTuple.data = datavalue;
            colTupleList.push_back(colTuple);
          }

          if (oneWarn)
            pushWarn = true;

          if (!pushWarning)
          {
            pushWarning = pushWarn;
          }

          if (pushWarn)
            nameNeeded = true;
        }
        else if (isNull && (colType.defaultValue.length() <= 0) &&
                 (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
        {
          rc = 1;
          Message::Args args;
          args.add(tableColName.column);
          err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
          return rc;
        }
        else if (isNull && (colType.defaultValue.length() > 0))
        {
          isNull = false;
          bool oneWarn = false;

          for (unsigned row = 0; row < rowsThisRowgroup; row++)
          {
            try
            {
              datavalue =
                  colType.convertColumnData(colType.defaultValue.safeString(), pushWarn, timeZone, isNull, false, false);
            }
            catch (exception&)
            {
              //@Bug 2624. Error out on conversion failure
              rc = 1;
              Message::Args args;
              args.add(string("'") + colType.defaultValue.safeString() + string("'"));
              err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
            }

            if ((pushWarn) && (!oneWarn))
              oneWarn = true;

            colTuple.data = datavalue;
            colTupleList.push_back(colTuple);
          }

          if (oneWarn)
            pushWarn = true;

          if (!pushWarning)
          {
            pushWarning = pushWarn;
          }

          if (pushWarn)
            nameNeeded = true;
        }
        else
        {
          try
          {
            datavalue = colType.convertColumnData(inData, pushWarn, timeZone, false, true);
          }
          catch (exception& ex)
          {
            //@Bug 2624. Error out on conversion failure
            rc = 1;
            cout << ex.what() << endl;
            Message::Args args;
            args.add(string("'") + inData.safeString() + string("'"));
            err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
            return rc;
          }

          colTuple.data = datavalue;

          if (!pushWarning)
          {
            pushWarning = pushWarn;
          }

          if (pushWarn)
            nameNeeded = true;

          for (unsigned row = 0; row < rowsThisRowgroup; row++)
            colTupleList.push_back(colTuple);
        }
      }
    }

    if (nameNeeded)
    {
      colNames.push_back(tableColName.column);
    }

    colStructList.push_back(colStruct);
    colValueList.push_back(colTupleList);
    cscColTypeList.push_back(colType);
  }  // end of bulding values and column structure.

  // timer.stop("fetch values");
  if (rowIDLists.size() > 0)
  {
    error = fWEWrapper.updateColumnRecs(txnId, cscColTypeList, colStructList, colValueList, rowIDLists,
                                        tableRO.objnum);
  }

  if (error != NO_ERROR)
  {
    rc = error;
    WErrorCodes ec;
    err = ec.errorString(error);

    if (error == ERR_BRM_DEAD_LOCK)
    {
      rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
    }
    else if (error == ERR_BRM_VB_OVERFLOW)
    {
      rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
      err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
    }
  }

  if (pushWarning)
  {
    rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
    Message::Args args;
    string cols = "'" + colNames[0] + "'";

    for (unsigned i = 1; i < colNames.size(); i++)
    {
      cols = cols + ", " + "'" + colNames[i] + "'";
    }

    args.add(cols);
    err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
  }

  return rc;
}

uint8_t WE_DMLCommandProc::getWrittenLbids(messageqcpp::ByteStream& bs, std::string& err,
                                           ByteStream::quadbyte& PMId)
{
  uint8_t rc = 0;
  uint32_t txnId;
  vector<LBID_t> lbidList;

  bs >> txnId;
  std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();

  try
  {
    auto mapIter = m_txnLBIDMap.find(txnId);

    if (mapIter != m_txnLBIDMap.end())
    {
      SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
      lbidList = spTxnLBIDRec->m_LBIDs;
    }
  }
  catch (...)
  {
  }

  bs.restart();

  try
  {
    serializeInlineVector(bs, lbidList);
  }
  catch (exception& ex)
  {
    // Append to errmsg in case we already have an error
    if (err.length() > 0)
      err += "; ";

    err += ex.what();
    rc = 1;
    return rc;
  }

  return rc;
}

uint8_t WE_DMLCommandProc::processFlushFiles(messageqcpp::ByteStream& bs, std::string& err)
{
  uint8_t rc = 0;
  uint32_t flushCode, txnId, tableOid;
  int error;
  bs >> flushCode;
  bs >> txnId;
  bs >> tableOid;
  std::map<uint32_t, uint32_t> oids;
  CalpontSystemCatalog::TableName aTableName;
  CalpontSystemCatalog::RIDList ridList;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(txnId);
  // execplan::CalpontSystemCatalog::ColType colType;
  CalpontSystemCatalog::DictOIDList dictOids;

  if (tableOid >= 3000)
  {
    try
    {
      aTableName = systemCatalogPtr->tableName(tableOid);
    }
    catch (...)
    {
      err = std::string("Systemcatalog error for tableoid ") + std::to_string(tableOid);
      rc = 1;
      return rc;
    }

    dictOids = systemCatalogPtr->dictOIDs(aTableName);

    for (unsigned i = 0; i < dictOids.size(); i++)
    {
      oids[dictOids[i].dictOID] = dictOids[i].dictOID;
    }

    // if (dictOids.size() > 0)
    //	colType = systemCatalogPtr->colTypeDct(dictOids[0].dictOID);
  }

  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  vector<LBID_t> lbidList;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    // XXX THIS IS WRONG!!!
    // save the extent info to mark them invalid, after flush, the meta file will be gone.
    std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();

    try
    {
      auto mapIter = m_txnLBIDMap.find(txnId);

      if (mapIter != m_txnLBIDMap.end())
      {
        SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
        lbidList = spTxnLBIDRec->m_LBIDs;
      }
    }
    catch (...)
    {
    }
  }

  error = fWEWrapper.flushDataFiles(flushCode, txnId, oids);

  // No need to close files, flushDataFile will close them.
  // if (((colType.compressionType > 0 ) && (dictOids.size() > 0)) || (idbdatafile::IDBPolicy::useHdfs()))
  //	fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
  if (error != NO_ERROR)
  {
    rc = error;
    WErrorCodes ec;
    err = ec.errorString(error);
  }

  // erase rowgroup from the rowGroup map
  if (rowGroups[txnId])
  {
    delete rowGroups[txnId];
    rowGroups[txnId] = 0;
  }

  TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
  ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
  ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
  ColExtsInfo::iterator aIt;
  std::vector<BRM::FileInfo> files;
  std::vector<BRM::OID_t> oidsToFlush;
  BRM::FileInfo aFile;

  while (it != colsExtsInfoMap.end())
  {
    aIt = (it->second).begin();
    aFile.oid = it->first;
    oidsToFlush.push_back(aFile.oid);

    while (aIt != (it->second).end())
    {
      aFile.partitionNum = aIt->partNum;
      aFile.dbRoot = aIt->dbRoot;
      aFile.segmentNum = aIt->segNum;
      aFile.compType = aIt->compType;
      files.push_back(aFile);
      // cout <<"Added to files oid:dbroot:part:seg:compType = " <<
      // aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
      //<<":"<<aFile.compType <<endl;
      aIt++;
    }

    it++;
  }

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    rc = fWEWrapper.confirmTransaction(txnId);
    //@Bug 5700. Purge FD cache after file swap
    cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
    cacheutils::flushOIDsFromCache(oidsToFlush);
    fDbrm.invalidateUncommittedExtentLBIDs(0, false, &lbidList);
  }

  // cout << "Purged files.size:moduleId = " << files.size() << ":"<<Config::getLocalModuleID() << endl;
  // if (idbdatafile::IDBPolicy::useHdfs())
  //		cacheutils::dropPrimProcFdCache();
  TableMetaData::removeTableMetaData(tableOid);

  // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
  CalpontSystemCatalog::removeCalpontSystemCatalog(txnId);
  CalpontSystemCatalog::removeCalpontSystemCatalog(txnId | 0x80000000);
  return rc;
}

uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, std::string& err,
                                         ByteStream::quadbyte& PMId, uint64_t& blocksChanged)
{
  uint8_t rc = 0;
  uint32_t tmp32, sessionID;
  TxnID txnId;
  bs >> PMId;
  bs >> sessionID;
  bs >> tmp32;
  txnId = tmp32;
  string schema, tableName;
  bs >> schema;
  bs >> tableName;
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.setTransId(txnId);

  if (!rowGroups[txnId])  // meta data
  {
    rowGroups[txnId] = new rowgroup::RowGroup();
    rowGroups[txnId]->deserialize(bs);
    // If hdfs, call chunkmanager to set up

    rc = fWEWrapper.startTransaction(txnId);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      err = ec.errorString(rc);
    }

    return rc;
  }

  rowgroup::RGData rgData;
  rgData.deserialize(bs);
  rowGroups[txnId]->setData(&rgData);
  // rowGroups[txnId]->setData(const_cast<uint8_t*>(bs.buf()));
  // get row ids
  rowgroup::Row row;
  rowGroups[txnId]->initRow(&row);
  WriteEngine::RIDList rowIDList;
  CalpontSystemCatalog::RID rid;
  uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount();
  uint16_t dbRoot, segment, blockNum;
  uint32_t partition;
  uint8_t extentNum;
  CalpontSystemCatalog::TableName aTableName;
  aTableName.schema = schema;
  aTableName.table = tableName;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  CalpontSystemCatalog::ROPair roPair;
  CalpontSystemCatalog::OID tableAUXColOid;

  CalpontSystemCatalog::RIDList tableRidList;

  try
  {
    roPair = systemCatalogPtr->tableRID(aTableName);
    tableRidList = systemCatalogPtr->columnRIDs(aTableName, true);
    tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(aTableName);
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  // querystats
  uint64_t relativeRID = 0;
  boost::scoped_array<int> preBlkNums(new int[row.getColumnCount()]);
  boost::scoped_array<uint32_t> colWidth(new uint32_t[row.getColumnCount()]);

  // initialize
  for (uint32_t j = 0; j < row.getColumnCount(); j++)
  {
    preBlkNums[j] = -1;
    colWidth[j] = row.getColumnWidth(j);
    execplan::CalpontSystemCatalog::ColDataType colDataType = row.getColType(j);
    if (colWidth[j] >= 8 && !(colDataType == execplan::CalpontSystemCatalog::DECIMAL ||
                              colDataType == execplan::CalpontSystemCatalog::UDECIMAL))
    {
      colWidth[j] = 8;
    }
    else if (colWidth[j] >= datatypes::MAXDECIMALWIDTH)
    {
      colWidth[j] = datatypes::MAXDECIMALWIDTH;
    }
  }

  // Get the file information from rowgroup
  dbRoot = rowGroups[txnId]->getDBRoot();
  rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
  WriteEngine::ColStructList colStructList;
  WriteEngine::CSCTypesList cscColTypeList;
  WriteEngine::ColStruct colStruct;
  colStruct.fColPartition = partition;
  colStruct.fColSegment = segment;
  colStruct.fColDbRoot = dbRoot;

  for (unsigned i = 0; i < rowsThisRowgroup; i++)
  {
    rowGroups[txnId]->getRow(i, &row);
    rid = row.getRid();
    relativeRID = rid - rowGroups[txnId]->getBaseRid();
    rid = relativeRID;
    convertToRelativeRid(rid, extentNum, blockNum);
    rowIDList.push_back(rid);

    // populate stats.blocksChanged
    for (uint32_t j = 0; j < row.getColumnCount(); j++)
    {
      if ((int)(relativeRID / (BYTE_PER_BLOCK / colWidth[j])) > preBlkNums[j])
      {
        blocksChanged++;
        preBlkNums[j] = relativeRID / (BYTE_PER_BLOCK / colWidth[j]);
      }
    }
  }

  // MCOL-5021 Valid AUX column OID for a table is > 3000
  // Tables that were created before this feature was added will have
  // tableAUXColOid = 0
  bool hasAUXCol = tableAUXColOid > 3000;

  try
  {
    for (unsigned i = 0; i < tableRidList.size(); i++)
    {
      CalpontSystemCatalog::ColType colType;
      colType = systemCatalogPtr->colType(tableRidList[i].objnum);
      colStruct.dataOid = tableRidList[i].objnum;
      colStruct.tokenFlag = false;
      colStruct.fCompressionType = colType.compressionType;

      if (colType.colWidth > 8 && !(colType.colDataType == CalpontSystemCatalog::DECIMAL ||
                                    colType.colDataType == CalpontSystemCatalog::UDECIMAL))  // token
      {
        colStruct.colWidth = 8;
        colStruct.tokenFlag = true;
      }
      else
      {
        colStruct.colWidth = colType.colWidth;
      }

      colStruct.colDataType = colType.colDataType;

      colStructList.push_back(colStruct);
      cscColTypeList.push_back(colType);
    }
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  if (hasAUXCol)
  {
    CalpontSystemCatalog::ColType colType;
    colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
    colType.colWidth = execplan::AUX_COL_WIDTH;
    colType.colDataType = execplan::AUX_COL_DATATYPE;
    colStruct.dataOid = tableAUXColOid;
    colStruct.tokenFlag = false;
    colStruct.fCompressionType = colType.compressionType;
    colStruct.colWidth = colType.colWidth;
    colStruct.colDataType = colType.colDataType;
    colStructList.push_back(colStruct);
    cscColTypeList.push_back(colType);
  }

  std::vector<ColStructList> colExtentsStruct;
  std::vector<CSCTypesList> colExtentsColType;
  std::vector<void*> colOldValueList;
  std::vector<RIDList> ridLists;
  colExtentsStruct.push_back(colStructList);
  colExtentsColType.push_back(cscColTypeList);
  ridLists.push_back(rowIDList);
  int error = 0;

  error = fWEWrapper.deleteRow(txnId, colExtentsColType, colExtentsStruct, colOldValueList, ridLists,
                               roPair.objnum, hasAUXCol);

  if (error != NO_ERROR)
  {
    rc = error;
    // cout << "WE Error code " << error << endl;
    WErrorCodes ec;
    err = ec.errorString(error);

    if (error == ERR_BRM_DEAD_LOCK)
    {
      rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
    }
    else if (error == ERR_BRM_VB_OVERFLOW)
    {
      rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
      err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
    }
  }

  return rc;
}

uint8_t WE_DMLCommandProc::processRemoveMeta(messageqcpp::ByteStream& bs, std::string& err)
{
  uint8_t rc = 0;
  uint32_t tableOID;

  try
  {
    bs >> tableOID;
    // std::cout << ": tableOID-" << tableOID << std::endl;

    BulkRollbackMgr::deleteMetaFile(tableOID);
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
  }

  return rc;
}

//------------------------------------------------------------------------------
// Process bulk rollback command
//------------------------------------------------------------------------------
uint8_t WE_DMLCommandProc::processBulkRollback(messageqcpp::ByteStream& bs, std::string& err)
{
  uint8_t rc = 0;
  err.clear();

  try
  {
    uint32_t tableOID;
    uint64_t tableLockID;
    std::string tableName;
    std::string appName;

    // May want to eventually comment out this logging to stdout,
    // but it shouldn't hurt to keep in here.
    std::cout << "processBulkRollback";
    bs >> tableLockID;
    // std::cout << ": tableLock-"<< tableLockID;

    bs >> tableOID;
    // std::cout << "; tableOID-" << tableOID;

    bs >> tableName;

    if (tableName.length() == 0)
    {
      boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
          CalpontSystemCatalog::makeCalpontSystemCatalog(0);
      CalpontSystemCatalog::TableName aTableName = systemCatalogPtr->tableName(tableOID);
      tableName = aTableName.toString();
    }

    std::cout << "; table-" << tableName;

    bs >> appName;
    std::cout << "; app-" << appName << std::endl;

    int we_rc = fWEWrapper.bulkRollback(tableOID, tableLockID, tableName, appName,
                                        false,  // no extra debug logging to the console
                                        err);

    if (we_rc != NO_ERROR)
      rc = 2;
  }
  catch (exception& ex)
  {
    std::cout << "processBulkRollback: exception-" << ex.what() << std::endl;
    err = ex.what();
    rc = 1;
  }

  return rc;
}

//------------------------------------------------------------------------------
// Process bulk rollback cleanup command (deletes bulk rollback meta data files)
//------------------------------------------------------------------------------
uint8_t WE_DMLCommandProc::processBulkRollbackCleanup(messageqcpp::ByteStream& bs, std::string& err)
{
  uint8_t rc = 0;
  err.clear();

  try
  {
    uint32_t tableOID;

    // May want to eventually comment out this logging to stdout,
    // but it shouldn't hurt to keep in here.
    std::cout << "processBulkRollbackCleanup";
    bs >> tableOID;
    std::cout << ": tableOID-" << tableOID << std::endl;

    BulkRollbackMgr::deleteMetaFile(tableOID);
  }
  catch (exception& ex)
  {
    std::cout << "processBulkRollbackCleanup: exception-" << ex.what() << std::endl;
    err = ex.what();
    rc = 1;
  }

  return rc;
}

uint8_t WE_DMLCommandProc::updateSyscolumnNextval(ByteStream& bs, std::string& err)
{
  uint32_t columnOid, sessionID;
  uint64_t nextVal;
  int rc = 0;
  bs >> columnOid;
  bs >> nextVal;
  bs >> sessionID;
  uint16_t dbRoot;
  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;
  oids[columnOid] = columnOid;
  // oidsToFlush.push_back(columnOid);
  BRM::OID_t oid = 1021;
  fDbrm.getSysCatDBRoot(oid, dbRoot);
  fWEWrapper.setTransId(sessionID);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.startTransaction(sessionID);
  // cout << "updateSyscolumnNextval startTransaction id " << sessionID << endl;
  rc = fWEWrapper.updateNextValue(sessionID, columnOid, nextVal, sessionID, dbRoot);

  if (rc != 0)
  {
    err = "Error in WE::updateNextValue";
    rc = 1;
  }

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    cout << "updateSyscolumnNextval flushDataFiles  " << endl;
    int rc1 = fWEWrapper.flushDataFiles(rc, sessionID, oids);

    if ((rc == 0) && (rc1 == 0))
    {
      cout << "updateSyscolumnNextval confirmTransaction rc =0  " << endl;
      rc1 = fWEWrapper.confirmTransaction(sessionID);
      cout << "updateSyscolumnNextval confirmTransaction return code is  " << rc1 << endl;

      if (rc1 == NO_ERROR)
        rc1 = fWEWrapper.endTransaction(sessionID, true);
      else
        fWEWrapper.endTransaction(sessionID, false);
    }
    else
    {
      cout << "updateSyscolumnNextval endTransaction with error  " << endl;
      fWEWrapper.endTransaction(sessionID, false);
    }

    if (rc == NO_ERROR)
      rc = rc1;
  }

  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);
  return rc;
}

uint8_t WE_DMLCommandProc::processPurgeFDCache(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t tableOid;
  bs >> tableOid;
  TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
  ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
  ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
  ColExtsInfo::iterator aIt;
  std::vector<BRM::FileInfo> files;
  BRM::FileInfo aFile;

  while (it != colsExtsInfoMap.end())
  {
    aIt = (it->second).begin();
    aFile.oid = it->first;

    while (aIt != (it->second).end())
    {
      aFile.partitionNum = aIt->partNum;
      aFile.dbRoot = aIt->dbRoot;
      aFile.segmentNum = aIt->segNum;
      aFile.compType = aIt->compType;
      files.push_back(aFile);
      // cout <<"Added to files oid:dbroot:part:seg:compType = " <<
      // aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
      //<<":"<<aFile.compType <<endl;
      aIt++;
    }

    it++;
  }

  if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
    cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());

  TableMetaData::removeTableMetaData(tableOid);
  return rc;
}

uint8_t WE_DMLCommandProc::processFixRows(messageqcpp::ByteStream& bs, std::string& err,
                                          ByteStream::quadbyte& PMId)
{
  uint8_t rc = 0;
  // cout << " In processFixRows" << endl;
  uint32_t tmp32;
  uint64_t sessionID;
  uint16_t dbRoot, segment;
  uint32_t partition;
  string schema, tableName;
  TxnID txnId;
  uint8_t tmp8;
  bool firstBatch = false;
  WriteEngine::RIDList rowIDList;
  bs >> PMId;
  bs >> tmp8;
  firstBatch = (tmp8 != 0);
  bs >> sessionID;
  bs >> tmp32;
  txnId = tmp32;

  bs >> schema;
  bs >> tableName;
  bs >> dbRoot;
  bs >> partition;
  bs >> segment;

  deserializeInlineVector(bs, rowIDList);

  // Need to identify whether this is the first batch to start transaction.
  if (firstBatch)
  {
    rc = fWEWrapper.startTransaction(txnId);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      err = ec.errorString(rc);
      return rc;
    }

    fWEWrapper.setIsInsert(false);
    fWEWrapper.setBulkFlag(false);
    fWEWrapper.setTransId(txnId);
    fWEWrapper.setFixFlag(true);
  }

  CalpontSystemCatalog::TableName aTableName;
  aTableName.schema = schema;
  aTableName.table = tableName;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);

  CalpontSystemCatalog::ROPair roPair;

  CalpontSystemCatalog::RIDList tableRidList;

  try
  {
    roPair = systemCatalogPtr->tableRID(aTableName);
    tableRidList = systemCatalogPtr->columnRIDs(aTableName, true);
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  WriteEngine::ColStructList colStructList;
  WriteEngine::ColStruct colStruct;
  WriteEngine::DctnryStructList dctnryStructList;
  /*	colStruct.fColPartition = partition;
      colStruct.fColSegment = segment;
      colStruct.fColDbRoot = dbRoot; */
  colStruct.fColPartition = 0;
  colStruct.fColSegment = 0;
  colStruct.fColDbRoot = 3;
  // should we always scan dictionary store files?

  try
  {
    for (unsigned i = 0; i < tableRidList.size(); i++)
    {
      CalpontSystemCatalog::ColType colType;
      colType = systemCatalogPtr->colType(tableRidList[i].objnum);
      colStruct.dataOid = tableRidList[i].objnum;
      colStruct.tokenFlag = false;
      colStruct.fCompressionType = colType.compressionType;
      WriteEngine::DctnryStruct dctnryStruct;
      dctnryStruct.fColDbRoot = colStruct.fColDbRoot;
      dctnryStruct.fColPartition = colStruct.fColPartition;
      dctnryStruct.fColSegment = colStruct.fColSegment;
      dctnryStruct.fCompressionType = colStruct.fCompressionType;
      dctnryStruct.dctnryOid = 0;

      dctnryStruct.fCharsetNumber = colType.charsetNumber;

      if (colType.colWidth > 8)  // token
      {
        colStruct.colWidth = 8;
        colStruct.tokenFlag = true;
        dctnryStruct.dctnryOid = colType.ddn.dictOID;
      }
      else
      {
        colStruct.colWidth = colType.colWidth;
      }

      colStruct.colDataType = colType.colDataType;

      colStructList.push_back(colStruct);
      dctnryStructList.push_back(dctnryStruct);
    }
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  int error = 0;

  try
  {
    error = fWEWrapper.deleteBadRows(txnId, colStructList, rowIDList, dctnryStructList);

    if (error != NO_ERROR)
    {
      rc = error;
      // cout << "WE Error code " << error << endl;
      WErrorCodes ec;
      err = ec.errorString(error);

      if (error == ERR_BRM_DEAD_LOCK)
      {
        rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
      }
      else if (error == ERR_BRM_VB_OVERFLOW)
      {
        rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
        err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
      }
    }
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    rc = 1;
  }

  // cout << "WES return rc " << (int)rc << " with msg " << err << endl;
  return rc;
}

uint8_t WE_DMLCommandProc::processEndTransaction(ByteStream& bs, std::string& err)
{
  int rc = 0;
  ByteStream::byte tmp8;
  bool success;
  uint32_t txnid;
  bs >> txnid;
  bs >> tmp8;
  success = (tmp8 != 0);
  rc = fWEWrapper.endTransaction(txnid, success);

  if (rc != NO_ERROR)
  {
    WErrorCodes ec;
    err = ec.errorString(rc);
  }

  return rc;
}
//------------------------------------------------------------------------------
// Validates the correctness of the current HWMs for this table.
// The HWMs for all the 1 byte columns should be identical.  Same goes
// for all the 2 byte columns, etc.  The 2 byte column HWMs should be
// "roughly" (but not necessarily exactly) twice that of a 1 byte column.
// Same goes for the 4 byte column HWMs vs their 2 byte counterparts, etc.
// ridList - columns oids to be used to get column width on to use with validation.
// segFileInfo - Vector of File objects carrying current DBRoot, partition,
//            HWM, etc to be validated for the columns belonging to jobTable.
// stage    - Current stage we are validating.  "Starting" or "Ending".
//------------------------------------------------------------------------------
int WE_DMLCommandProc::validateColumnHWMs(CalpontSystemCatalog::RIDList& ridList,
                                          boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr,
                                          const std::vector<DBRootExtentInfo>& segFileInfo, const char* stage,
                                          bool hasAuxCol)
{
  int rc = NO_ERROR;

  if ((!fIsFirstBatchPm) && (strcmp(stage, "Starting") == 0))
    return rc;

  // Used to track first 1-byte, 2-byte, 4-byte, 8-byte and 16-byte columns in table
  int byte1First = -1;
  int byte2First = -1;
  int byte4First = -1;
  int byte8First = -1;
  int byte16First = -1;

  // Make sure the HWMs for all 1-byte columns match; same for all 2-byte,
  // 4-byte, 8-byte, and 16-byte columns as well.
  CalpontSystemCatalog::ColType colType;
  Convertor convertor;
  uint32_t colWidth;

  for (unsigned k = 0; k < segFileInfo.size(); k++)
  {
    unsigned k1 = 0;

    // Find out column width
    if (hasAuxCol && (k == segFileInfo.size() - 1))
    {
      colWidth = execplan::AUX_COL_WIDTH;
    }
    else
    {
      colType = systemCatalogPtr->colType(ridList[k].objnum);
      colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
    }

    // Find the first 1-byte, 2-byte, 4-byte, 8-byte and 16-byte columns.
    // Use those as our reference HWM for the respective column widths.
    switch (colWidth)
    {
      case 1:
      {
        if (byte1First == -1)
          byte1First = k;

        k1 = byte1First;
        break;
      }

      case 2:
      {
        if (byte2First == -1)
          byte2First = k;

        k1 = byte2First;
        break;
      }

      case 4:
      {
        if (byte4First == -1)
          byte4First = k;

        k1 = byte4First;
        break;
      }

      case 16:
      {
        if (byte16First == -1)
          byte16First = k;

        k1 = byte16First;
        break;
      }

      case 8:
      default:
      {
        if (byte8First == -1)
          byte8First = k;

        k1 = byte8First;
        break;
      }
    }  // end of switch based on column width (1,2,4,8 or 16)

    // std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 <<
    //  "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm <<
    //  " <to> col-" << k <<
    //  "; wid-" << jobColK.width << " ; hwm-"<<segFileInfo[k].fLocalHwm<<std::endl;

    // Validate that the HWM for this column (k) matches that of the
    // corresponding reference column with the same width.
    if ((segFileInfo[k1].fDbRoot != segFileInfo[k].fDbRoot) ||
        (segFileInfo[k1].fPartition != segFileInfo[k].fPartition) ||
        (segFileInfo[k1].fSegment != segFileInfo[k].fSegment) ||
        (segFileInfo[k1].fLocalHwm != segFileInfo[k].fLocalHwm))
    {
      uint32_t colWidth2;

      if (hasAuxCol && (k1 == segFileInfo.size() - 1))
      {
        colWidth2 = execplan::AUX_COL_WIDTH;
      }
      else
      {
        CalpontSystemCatalog::ColType colType2;
        colType2 = systemCatalogPtr->colType(ridList[k1].objnum);
        colWidth2 = colType2.colWidth;
      }

      ostringstream oss;
      oss << stage
          << " HWMs do not match for"
             " OID1-"
          << ridList[k1].objnum << "; DBRoot-" << segFileInfo[k1].fDbRoot << "; partition-"
          << segFileInfo[k1].fPartition << "; segment-" << segFileInfo[k1].fSegment << "; hwm-"
          << segFileInfo[k1].fLocalHwm << "; width-" << colWidth2 << ':' << std::endl
          << " and OID2-" << ridList[k].objnum << "; DBRoot-" << segFileInfo[k].fDbRoot << "; partition-"
          << segFileInfo[k].fPartition << "; segment-" << segFileInfo[k].fSegment << "; hwm-"
          << segFileInfo[k].fLocalHwm << "; width-" << colType.colWidth;
      fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR);
      return ERR_BRM_HWMS_NOT_EQUAL;
    }

    // HWM DBRoot, partition, and segment number should match for all
    // columns; so compare DBRoot, part#, and seg# with first column.
    if ((segFileInfo[0].fDbRoot != segFileInfo[k].fDbRoot) ||
        (segFileInfo[0].fPartition != segFileInfo[k].fPartition) ||
        (segFileInfo[0].fSegment != segFileInfo[k].fSegment))
    {
      CalpontSystemCatalog::ColType colType2;
      colType2 = systemCatalogPtr->colType(ridList[0].objnum);
      ostringstream oss;
      oss << stage
          << " HWM DBRoot,Part#, or Seg# do not match for"
             " OID1-"
          << ridList[0].objnum << "; DBRoot-" << segFileInfo[0].fDbRoot << "; partition-"
          << segFileInfo[0].fPartition << "; segment-" << segFileInfo[0].fSegment << "; hwm-"
          << segFileInfo[0].fLocalHwm << "; width-" << colType2.colWidth << ':' << std::endl
          << " and OID2-" << ridList[k].objnum << "; DBRoot-" << segFileInfo[k].fDbRoot << "; partition-"
          << segFileInfo[k].fPartition << "; segment-" << segFileInfo[k].fSegment << "; hwm-"
          << segFileInfo[k].fLocalHwm << "; width-" << colType.colWidth;
      fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR);
      return ERR_BRM_HWMS_NOT_EQUAL;
    }
  }  // end of loop to compare all 1-byte HWMs, 2-byte HWMs, etc.

  // Validate/compare HWM for 1-byte column in relation to 2-byte column, etc.
  // Without knowing the exact row count, we can't extrapolate the exact HWM
  // for the wider column, but we can narrow it down to an expected range.
  unsigned refCol = 0;
  unsigned colIdx = 0;

  // Validate/compare HWMs given a 1-byte column as a starting point
  if (byte1First >= 0)
  {
    refCol = byte1First;

    if (byte2First >= 0)
    {
      HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 2;
      HWM hwmHi = hwmLo + 1;

      if ((segFileInfo[byte2First].fLocalHwm < hwmLo) || (segFileInfo[byte2First].fLocalHwm > hwmHi))
      {
        colIdx = byte2First;
        rc = ERR_BRM_HWMS_OUT_OF_SYNC;
        goto errorCheck;
      }
    }

    if (byte4First >= 0)
    {
      HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 4;
      HWM hwmHi = hwmLo + 3;

      if ((segFileInfo[byte4First].fLocalHwm < hwmLo) || (segFileInfo[byte4First].fLocalHwm > hwmHi))
      {
        colIdx = byte4First;
        rc = ERR_BRM_HWMS_OUT_OF_SYNC;
        goto errorCheck;
      }
    }

    if (byte8First >= 0)
    {
      HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 8;
      HWM hwmHi = hwmLo + 7;

      if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi))
      {
        colIdx = byte8First;
        rc = ERR_BRM_HWMS_OUT_OF_SYNC;
        goto errorCheck;
      }
    }
  }

  // Validate/compare HWMs given a 2-byte column as a starting point
  if (byte2First >= 0)
  {
    refCol = byte2First;

    if (byte4First >= 0)
    {
      HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 2;
      HWM hwmHi = hwmLo + 1;

      if ((segFileInfo[byte4First].fLocalHwm < hwmLo) || (segFileInfo[byte4First].fLocalHwm > hwmHi))
      {
        colIdx = byte4First;
        rc = ERR_BRM_HWMS_OUT_OF_SYNC;
        goto errorCheck;
      }
    }

    if (byte8First >= 0)
    {
      HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 4;
      HWM hwmHi = hwmLo + 3;

      if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi))
      {
        colIdx = byte8First;
        rc = ERR_BRM_HWMS_OUT_OF_SYNC;
        goto errorCheck;
      }
    }
  }

  // Validate/compare HWMs given a 4-byte column as a starting point
  if (byte4First >= 0)
  {
    refCol = byte4First;

    if (byte8First >= 0)
    {
      HWM hwmLo = segFileInfo[byte4First].fLocalHwm * 2;
      HWM hwmHi = hwmLo + 1;

      if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi))
      {
        colIdx = byte8First;
        rc = ERR_BRM_HWMS_OUT_OF_SYNC;
        goto errorCheck;
      }
    }
  }

// To avoid repeating this message 6 times in the preceding source code, we
// use the "dreaded" goto to branch to this single place for error handling.
errorCheck:

  if (rc != NO_ERROR)
  {
    uint32_t colWidth1, colWidth2;

    if (hasAuxCol && (refCol == ridList.size() - 1))
    {
      colWidth1 = execplan::AUX_COL_WIDTH;
    }
    else
    {
      CalpontSystemCatalog::ColType colType1 = systemCatalogPtr->colType(ridList[refCol].objnum);
      colWidth1 = convertor.getCorrectRowWidth(colType1.colDataType, colType1.colWidth);
    }

    if (hasAuxCol && (colIdx == ridList.size() - 1))
    {
      colWidth2 = execplan::AUX_COL_WIDTH;
    }
    else
    {
      CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[colIdx].objnum);
      colWidth2 = convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth);
    }

    ostringstream oss;
    oss << stage
        << " HWMs are not in sync for"
           " OID1-"
        << ridList[refCol].objnum << "; DBRoot-" << segFileInfo[refCol].fDbRoot << "; partition-"
        << segFileInfo[refCol].fPartition << "; segment-" << segFileInfo[refCol].fSegment << "; hwm-"
        << segFileInfo[refCol].fLocalHwm << "; width-" << colWidth1 << ':' << std::endl
        << " and OID2-" << ridList[colIdx].objnum << "; DBRoot-" << segFileInfo[colIdx].fDbRoot
        << "; partition-" << segFileInfo[colIdx].fPartition << "; segment-" << segFileInfo[colIdx].fSegment
        << "; hwm-" << segFileInfo[colIdx].fLocalHwm << "; width-" << colWidth2;
    fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR);
  }

  return rc;
}
}  // namespace WriteEngine
